123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- // Copyright 2014 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package http2
- import (
- "errors"
- "io"
- "sync"
- )
- // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
- // io.Pipe except there are no PipeReader/PipeWriter halves, and the
- // underlying buffer is an interface. (io.Pipe is always unbuffered)
- type pipe struct {
- mu sync.Mutex
- c sync.Cond // c.L lazily initialized to &p.mu
- b pipeBuffer // nil when done reading
- unread int // bytes unread when done
- err error // read error once empty. non-nil means closed.
- breakErr error // immediate read error (caller doesn't see rest of b)
- donec chan struct{} // closed on error
- readFn func() // optional code to run in Read before error
- }
- type pipeBuffer interface {
- Len() int
- io.Writer
- io.Reader
- }
- func (p *pipe) Len() int {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.b == nil {
- return p.unread
- }
- return p.b.Len()
- }
- // Read waits until data is available and copies bytes
- // from the buffer into p.
- func (p *pipe) Read(d []byte) (n int, err error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.c.L == nil {
- p.c.L = &p.mu
- }
- for {
- if p.breakErr != nil {
- return 0, p.breakErr
- }
- if p.b != nil && p.b.Len() > 0 {
- return p.b.Read(d)
- }
- if p.err != nil {
- if p.readFn != nil {
- p.readFn() // e.g. copy trailers
- p.readFn = nil // not sticky like p.err
- }
- p.b = nil
- return 0, p.err
- }
- p.c.Wait()
- }
- }
- var errClosedPipeWrite = errors.New("write on closed buffer")
- // Write copies bytes from p into the buffer and wakes a reader.
- // It is an error to write more data than the buffer can hold.
- func (p *pipe) Write(d []byte) (n int, err error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.c.L == nil {
- p.c.L = &p.mu
- }
- defer p.c.Signal()
- if p.err != nil {
- return 0, errClosedPipeWrite
- }
- if p.breakErr != nil {
- p.unread += len(d)
- return len(d), nil // discard when there is no reader
- }
- return p.b.Write(d)
- }
- // CloseWithError causes the next Read (waking up a current blocked
- // Read if needed) to return the provided err after all data has been
- // read.
- //
- // The error must be non-nil.
- func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
- // BreakWithError causes the next Read (waking up a current blocked
- // Read if needed) to return the provided err immediately, without
- // waiting for unread data.
- func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
- // closeWithErrorAndCode is like CloseWithError but also sets some code to run
- // in the caller's goroutine before returning the error.
- func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
- func (p *pipe) closeWithError(dst *error, err error, fn func()) {
- if err == nil {
- panic("err must be non-nil")
- }
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.c.L == nil {
- p.c.L = &p.mu
- }
- defer p.c.Signal()
- if *dst != nil {
- // Already been done.
- return
- }
- p.readFn = fn
- if dst == &p.breakErr {
- if p.b != nil {
- p.unread += p.b.Len()
- }
- p.b = nil
- }
- *dst = err
- p.closeDoneLocked()
- }
- // requires p.mu be held.
- func (p *pipe) closeDoneLocked() {
- if p.donec == nil {
- return
- }
- // Close if unclosed. This isn't racy since we always
- // hold p.mu while closing.
- select {
- case <-p.donec:
- default:
- close(p.donec)
- }
- }
- // Err returns the error (if any) first set by BreakWithError or CloseWithError.
- func (p *pipe) Err() error {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.breakErr != nil {
- return p.breakErr
- }
- return p.err
- }
- // Done returns a channel which is closed if and when this pipe is closed
- // with CloseWithError.
- func (p *pipe) Done() <-chan struct{} {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.donec == nil {
- p.donec = make(chan struct{})
- if p.err != nil || p.breakErr != nil {
- // Already hit an error.
- p.closeDoneLocked()
- }
- }
- return p.donec
- }
|