flow.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // Copyright 2014 The Go Authors.
  2. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  3. // Licensed under the same terms as Go itself:
  4. // https://code.google.com/p/go/source/browse/LICENSE
  5. // Flow control
  6. package http2
  7. import "sync"
  8. // flow is the flow control window's counting semaphore.
  9. type flow struct {
  10. c *sync.Cond // protects size
  11. size int32
  12. closed bool
  13. }
  14. func newFlow(n int32) *flow {
  15. return &flow{
  16. c: sync.NewCond(new(sync.Mutex)),
  17. size: n,
  18. }
  19. }
  20. // cur returns the current number of bytes allow to write. Obviously
  21. // it's not safe to call this and assume acquiring that number of
  22. // bytes from the acquire method won't be block in the presence of
  23. // concurrent acquisitions.
  24. func (f *flow) cur() int32 {
  25. f.c.L.Lock()
  26. defer f.c.L.Unlock()
  27. return f.size
  28. }
  29. // acquire decrements the flow control window by n bytes, blocking
  30. // until they're available in the window.
  31. // The return value is only interesting for tests.
  32. func (f *flow) acquire(n int32) (waited int) {
  33. if n < 0 {
  34. panic("negative acquire")
  35. }
  36. f.c.L.Lock()
  37. defer f.c.L.Unlock()
  38. for {
  39. if f.closed {
  40. return
  41. }
  42. if f.size >= n {
  43. f.size -= n
  44. return
  45. }
  46. waited++
  47. f.c.Wait()
  48. }
  49. }
  50. // add adds n bytes (positive or negative) to the flow control window.
  51. // It returns false if the sum would exceed 2^31-1.
  52. func (f *flow) add(n int32) bool {
  53. f.c.L.Lock()
  54. defer f.c.L.Unlock()
  55. remain := (1<<31 - 1) - f.size
  56. if n > remain {
  57. return false
  58. }
  59. f.size += n
  60. f.c.Broadcast()
  61. return true
  62. }
  63. // close marks the flow as closed, meaning everybody gets all the
  64. // tokens they want, because everything else will fail anyway.
  65. func (f *flow) close() {
  66. f.c.L.Lock()
  67. defer f.c.L.Unlock()
  68. f.closed = true
  69. f.c.Broadcast()
  70. }