pipe.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
  11. // io.Pipe except there are no PipeReader/PipeWriter halves, and the
  12. // underlying buffer is an interface. (io.Pipe is always unbuffered)
  13. type pipe struct {
  14. mu sync.Mutex
  15. c sync.Cond // c.L must point to
  16. b pipeBuffer
  17. err error // read error once empty. non-nil means closed.
  18. donec chan struct{} // closed on error
  19. }
  20. type pipeBuffer interface {
  21. Len() int
  22. io.Writer
  23. io.Reader
  24. }
  25. // Read waits until data is available and copies bytes
  26. // from the buffer into p.
  27. func (p *pipe) Read(d []byte) (n int, err error) {
  28. p.mu.Lock()
  29. defer p.mu.Unlock()
  30. if p.c.L == nil {
  31. p.c.L = &p.mu
  32. }
  33. for {
  34. if p.b.Len() > 0 {
  35. return p.b.Read(d)
  36. }
  37. if p.err != nil {
  38. return 0, p.err
  39. }
  40. p.c.Wait()
  41. }
  42. }
  43. var errClosedPipeWrite = errors.New("write on closed buffer")
  44. // Write copies bytes from p into the buffer and wakes a reader.
  45. // It is an error to write more data than the buffer can hold.
  46. func (p *pipe) Write(d []byte) (n int, err error) {
  47. p.mu.Lock()
  48. defer p.mu.Unlock()
  49. if p.c.L == nil {
  50. p.c.L = &p.mu
  51. }
  52. defer p.c.Signal()
  53. if p.err != nil {
  54. return 0, errClosedPipeWrite
  55. }
  56. return p.b.Write(d)
  57. }
  58. // CloseWithError causes Reads to wake up and return the
  59. // provided err after all data has been read.
  60. //
  61. // The error must be non-nil.
  62. func (p *pipe) CloseWithError(err error) {
  63. if err == nil {
  64. panic("CloseWithError must be non-nil")
  65. }
  66. p.mu.Lock()
  67. defer p.mu.Unlock()
  68. if p.c.L == nil {
  69. p.c.L = &p.mu
  70. }
  71. defer p.c.Signal()
  72. if p.err == nil {
  73. p.err = err
  74. if p.donec != nil {
  75. close(p.donec)
  76. }
  77. }
  78. }
  79. // Err returns the error (if any) first set with CloseWithError.
  80. // This is the error which will be returned after the reader is exhausted.
  81. func (p *pipe) Err() error {
  82. p.mu.Lock()
  83. defer p.mu.Unlock()
  84. return p.err
  85. }
  86. // Done returns a channel which is closed if and when this pipe is closed
  87. // with CloseWithError.
  88. func (p *pipe) Done() <-chan struct{} {
  89. p.mu.Lock()
  90. defer p.mu.Unlock()
  91. if p.donec == nil {
  92. p.donec = make(chan struct{})
  93. if p.err != nil {
  94. // Already hit an error.
  95. close(p.donec)
  96. }
  97. }
  98. return p.donec
  99. }