flow.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. // Copyright 2014 The Go Authors.
  2. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  3. // Licensed under the same terms as Go itself:
  4. // https://code.google.com/p/go/source/browse/LICENSE
  5. // Flow control
  6. package http2
  7. import "sync"
  8. // flow is the flow control window's counting semaphore.
  9. type flow struct {
  10. c *sync.Cond // protects size
  11. size int32
  12. closed bool
  13. }
  14. func newFlow(n int32) *flow {
  15. return &flow{
  16. c: sync.NewCond(new(sync.Mutex)),
  17. size: n,
  18. }
  19. }
  20. // cur returns the current number of bytes allow to write. Obviously
  21. // it's not safe to call this and assume acquiring that number of
  22. // bytes from the acquire method won't be block in the presence of
  23. // concurrent acquisitions.
  24. func (f *flow) cur() int32 {
  25. f.c.L.Lock()
  26. defer f.c.L.Unlock()
  27. return f.size
  28. }
  29. // wait waits for between 1 and n bytes (inclusive) to be available
  30. // and returns the number of quota bytes decremented from the quota
  31. // and allowed to be written. The returned value will be 0 iff the
  32. // stream has been killed.
  33. func (f *flow) wait(n int32) (got int32) {
  34. if n < 0 {
  35. panic("negative acquire")
  36. }
  37. f.c.L.Lock()
  38. defer f.c.L.Unlock()
  39. for {
  40. if f.closed {
  41. return 0
  42. }
  43. if f.size >= 1 {
  44. got = f.size
  45. if got > n {
  46. got = n
  47. }
  48. f.size -= got
  49. return got
  50. }
  51. f.c.Wait()
  52. }
  53. }
  54. // add adds n bytes (positive or negative) to the flow control window.
  55. // It returns false if the sum would exceed 2^31-1.
  56. func (f *flow) add(n int32) bool {
  57. f.c.L.Lock()
  58. defer f.c.L.Unlock()
  59. remain := (1<<31 - 1) - f.size
  60. if n > remain {
  61. return false
  62. }
  63. f.size += n
  64. f.c.Broadcast()
  65. return true
  66. }
  67. // close marks the flow as closed, meaning everybody gets all the
  68. // tokens they want, because everything else will fail anyway.
  69. func (f *flow) close() {
  70. f.c.L.Lock()
  71. defer f.c.L.Unlock()
  72. f.closed = true
  73. f.c.Broadcast()
  74. }