pipe.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
  11. // io.Pipe except there are no PipeReader/PipeWriter halves, and the
  12. // underlying buffer is an interface. (io.Pipe is always unbuffered)
  13. type pipe struct {
  14. mu sync.Mutex
  15. c sync.Cond // c.L must point to
  16. b pipeBuffer
  17. err error // read error once empty. non-nil means closed.
  18. donec chan struct{} // closed on error
  19. readFn func() // optional code to run in Read before error
  20. }
  21. type pipeBuffer interface {
  22. Len() int
  23. io.Writer
  24. io.Reader
  25. }
  26. // Read waits until data is available and copies bytes
  27. // from the buffer into p.
  28. func (p *pipe) Read(d []byte) (n int, err error) {
  29. p.mu.Lock()
  30. defer p.mu.Unlock()
  31. if p.c.L == nil {
  32. p.c.L = &p.mu
  33. }
  34. for {
  35. if p.b.Len() > 0 {
  36. return p.b.Read(d)
  37. }
  38. if p.err != nil {
  39. if p.readFn != nil {
  40. p.readFn() // e.g. copy trailers
  41. p.readFn = nil // not sticky like p.err
  42. }
  43. return 0, p.err
  44. }
  45. p.c.Wait()
  46. }
  47. }
  48. var errClosedPipeWrite = errors.New("write on closed buffer")
  49. // Write copies bytes from p into the buffer and wakes a reader.
  50. // It is an error to write more data than the buffer can hold.
  51. func (p *pipe) Write(d []byte) (n int, err error) {
  52. p.mu.Lock()
  53. defer p.mu.Unlock()
  54. if p.c.L == nil {
  55. p.c.L = &p.mu
  56. }
  57. defer p.c.Signal()
  58. if p.err != nil {
  59. return 0, errClosedPipeWrite
  60. }
  61. return p.b.Write(d)
  62. }
  63. // CloseWithError causes the next Read (waking up a current blocked
  64. // Read if needed) to return the provided err after all data has been
  65. // read.
  66. //
  67. // The error must be non-nil.
  68. func (p *pipe) CloseWithError(err error) { p.closeWithErrorAndCode(err, nil) }
  69. // closeWithErrorAndCode is like CloseWithError but also sets some code to run
  70. // in the caller's goroutine before returning the error.
  71. func (p *pipe) closeWithErrorAndCode(err error, fn func()) {
  72. if err == nil {
  73. panic("CloseWithError err must be non-nil")
  74. }
  75. p.mu.Lock()
  76. defer p.mu.Unlock()
  77. if p.c.L == nil {
  78. p.c.L = &p.mu
  79. }
  80. defer p.c.Signal()
  81. if p.err != nil {
  82. // Already been done.
  83. return
  84. }
  85. p.readFn = fn
  86. p.err = err
  87. if p.donec != nil {
  88. close(p.donec)
  89. }
  90. }
  91. // Err returns the error (if any) first set with CloseWithError.
  92. // This is the error which will be returned after the reader is exhausted.
  93. func (p *pipe) Err() error {
  94. p.mu.Lock()
  95. defer p.mu.Unlock()
  96. return p.err
  97. }
  98. // Done returns a channel which is closed if and when this pipe is closed
  99. // with CloseWithError.
  100. func (p *pipe) Done() <-chan struct{} {
  101. p.mu.Lock()
  102. defer p.mu.Unlock()
  103. if p.donec == nil {
  104. p.donec = make(chan struct{})
  105. if p.err != nil {
  106. // Already hit an error.
  107. close(p.donec)
  108. }
  109. }
  110. return p.donec
  111. }