|
@@ -14,12 +14,13 @@ import (
|
|
|
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
|
|
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
|
|
|
// underlying buffer is an interface. (io.Pipe is always unbuffered)
|
|
// underlying buffer is an interface. (io.Pipe is always unbuffered)
|
|
|
type pipe struct {
|
|
type pipe struct {
|
|
|
- mu sync.Mutex
|
|
|
|
|
- c sync.Cond // c.L must point to
|
|
|
|
|
- b pipeBuffer
|
|
|
|
|
- err error // read error once empty. non-nil means closed.
|
|
|
|
|
- donec chan struct{} // closed on error
|
|
|
|
|
- readFn func() // optional code to run in Read before error
|
|
|
|
|
|
|
+ mu sync.Mutex
|
|
|
|
|
+ c sync.Cond // c.L lazily initialized to &p.mu
|
|
|
|
|
+ b pipeBuffer
|
|
|
|
|
+ err error // read error once empty. non-nil means closed.
|
|
|
|
|
+ breakErr error // immediate read error (caller doesn't see rest of b)
|
|
|
|
|
+ donec chan struct{} // closed on error
|
|
|
|
|
+ readFn func() // optional code to run in Read before error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type pipeBuffer interface {
|
|
type pipeBuffer interface {
|
|
@@ -37,6 +38,9 @@ func (p *pipe) Read(d []byte) (n int, err error) {
|
|
|
p.c.L = &p.mu
|
|
p.c.L = &p.mu
|
|
|
}
|
|
}
|
|
|
for {
|
|
for {
|
|
|
|
|
+ if p.breakErr != nil {
|
|
|
|
|
+ return 0, p.breakErr
|
|
|
|
|
+ }
|
|
|
if p.b.Len() > 0 {
|
|
if p.b.Len() > 0 {
|
|
|
return p.b.Read(d)
|
|
return p.b.Read(d)
|
|
|
}
|
|
}
|
|
@@ -73,13 +77,20 @@ func (p *pipe) Write(d []byte) (n int, err error) {
|
|
|
// read.
|
|
// read.
|
|
|
//
|
|
//
|
|
|
// The error must be non-nil.
|
|
// The error must be non-nil.
|
|
|
-func (p *pipe) CloseWithError(err error) { p.closeWithErrorAndCode(err, nil) }
|
|
|
|
|
|
|
+func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
|
|
|
|
|
+
|
|
|
|
|
+// BreakWithError causes the next Read (waking up a current blocked
|
|
|
|
|
+// Read if needed) to return the provided err immediately, without
|
|
|
|
|
+// waiting for unread data.
|
|
|
|
|
+func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
|
|
|
|
|
|
|
|
// closeWithErrorAndCode is like CloseWithError but also sets some code to run
|
|
// closeWithErrorAndCode is like CloseWithError but also sets some code to run
|
|
|
// in the caller's goroutine before returning the error.
|
|
// in the caller's goroutine before returning the error.
|
|
|
-func (p *pipe) closeWithErrorAndCode(err error, fn func()) {
|
|
|
|
|
|
|
+func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
|
|
|
|
|
+
|
|
|
|
|
+func (p *pipe) closeWithError(dst *error, err error, fn func()) {
|
|
|
if err == nil {
|
|
if err == nil {
|
|
|
- panic("CloseWithError err must be non-nil")
|
|
|
|
|
|
|
+ panic("err must be non-nil")
|
|
|
}
|
|
}
|
|
|
p.mu.Lock()
|
|
p.mu.Lock()
|
|
|
defer p.mu.Unlock()
|
|
defer p.mu.Unlock()
|
|
@@ -87,22 +98,36 @@ func (p *pipe) closeWithErrorAndCode(err error, fn func()) {
|
|
|
p.c.L = &p.mu
|
|
p.c.L = &p.mu
|
|
|
}
|
|
}
|
|
|
defer p.c.Signal()
|
|
defer p.c.Signal()
|
|
|
- if p.err != nil {
|
|
|
|
|
|
|
+ if *dst != nil {
|
|
|
// Already been done.
|
|
// Already been done.
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
p.readFn = fn
|
|
p.readFn = fn
|
|
|
- p.err = err
|
|
|
|
|
- if p.donec != nil {
|
|
|
|
|
|
|
+ *dst = err
|
|
|
|
|
+ p.closeDoneLocked()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// requires p.mu be held.
|
|
|
|
|
+func (p *pipe) closeDoneLocked() {
|
|
|
|
|
+ if p.donec == nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ // Close if unclosed. This isn't racy since we always
|
|
|
|
|
+ // hold p.mu while closing.
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-p.donec:
|
|
|
|
|
+ default:
|
|
|
close(p.donec)
|
|
close(p.donec)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Err returns the error (if any) first set with CloseWithError.
|
|
|
|
|
-// This is the error which will be returned after the reader is exhausted.
|
|
|
|
|
|
|
+// Err returns the error (if any) first set by BreakWithError or CloseWithError.
|
|
|
func (p *pipe) Err() error {
|
|
func (p *pipe) Err() error {
|
|
|
p.mu.Lock()
|
|
p.mu.Lock()
|
|
|
defer p.mu.Unlock()
|
|
defer p.mu.Unlock()
|
|
|
|
|
+ if p.breakErr != nil {
|
|
|
|
|
+ return p.breakErr
|
|
|
|
|
+ }
|
|
|
return p.err
|
|
return p.err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -113,9 +138,9 @@ func (p *pipe) Done() <-chan struct{} {
|
|
|
defer p.mu.Unlock()
|
|
defer p.mu.Unlock()
|
|
|
if p.donec == nil {
|
|
if p.donec == nil {
|
|
|
p.donec = make(chan struct{})
|
|
p.donec = make(chan struct{})
|
|
|
- if p.err != nil {
|
|
|
|
|
|
|
+ if p.err != nil || p.breakErr != nil {
|
|
|
// Already hit an error.
|
|
// Already hit an error.
|
|
|
- close(p.donec)
|
|
|
|
|
|
|
+ p.closeDoneLocked()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
return p.donec
|
|
return p.donec
|