flowcontrol.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "fmt"
  21. "math"
  22. "sync"
  23. "sync/atomic"
  24. )
  25. // writeQuota is a soft limit on the amount of data a stream can
  26. // schedule before some of it is written out.
  27. type writeQuota struct {
  28. quota int32
  29. // get waits on read from when quota goes less than or equal to zero.
  30. // replenish writes on it when quota goes positive again.
  31. ch chan struct{}
  32. // done is triggered in error case.
  33. done <-chan struct{}
  34. // replenish is called by loopyWriter to give quota back to.
  35. // It is implemented as a field so that it can be updated
  36. // by tests.
  37. replenish func(n int)
  38. }
  39. func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
  40. w := &writeQuota{
  41. quota: sz,
  42. ch: make(chan struct{}, 1),
  43. done: done,
  44. }
  45. w.replenish = w.realReplenish
  46. return w
  47. }
  48. func (w *writeQuota) get(sz int32) error {
  49. for {
  50. if atomic.LoadInt32(&w.quota) > 0 {
  51. atomic.AddInt32(&w.quota, -sz)
  52. return nil
  53. }
  54. select {
  55. case <-w.ch:
  56. continue
  57. case <-w.done:
  58. return errStreamDone
  59. }
  60. }
  61. }
  62. func (w *writeQuota) realReplenish(n int) {
  63. sz := int32(n)
  64. a := atomic.AddInt32(&w.quota, sz)
  65. b := a - sz
  66. if b <= 0 && a > 0 {
  67. select {
  68. case w.ch <- struct{}{}:
  69. default:
  70. }
  71. }
  72. }
  73. type trInFlow struct {
  74. limit uint32
  75. unacked uint32
  76. effectiveWindowSize uint32
  77. }
  78. func (f *trInFlow) newLimit(n uint32) uint32 {
  79. d := n - f.limit
  80. f.limit = n
  81. f.updateEffectiveWindowSize()
  82. return d
  83. }
  84. func (f *trInFlow) onData(n uint32) uint32 {
  85. f.unacked += n
  86. if f.unacked >= f.limit/4 {
  87. w := f.unacked
  88. f.unacked = 0
  89. f.updateEffectiveWindowSize()
  90. return w
  91. }
  92. f.updateEffectiveWindowSize()
  93. return 0
  94. }
  95. func (f *trInFlow) reset() uint32 {
  96. w := f.unacked
  97. f.unacked = 0
  98. f.updateEffectiveWindowSize()
  99. return w
  100. }
  101. func (f *trInFlow) updateEffectiveWindowSize() {
  102. atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
  103. }
  104. func (f *trInFlow) getSize() uint32 {
  105. return atomic.LoadUint32(&f.effectiveWindowSize)
  106. }
  107. // TODO(mmukhi): Simplify this code.
  108. // inFlow deals with inbound flow control
  109. type inFlow struct {
  110. mu sync.Mutex
  111. // The inbound flow control limit for pending data.
  112. limit uint32
  113. // pendingData is the overall data which have been received but not been
  114. // consumed by applications.
  115. pendingData uint32
  116. // The amount of data the application has consumed but grpc has not sent
  117. // window update for them. Used to reduce window update frequency.
  118. pendingUpdate uint32
  119. // delta is the extra window update given by receiver when an application
  120. // is reading data bigger in size than the inFlow limit.
  121. delta uint32
  122. }
  123. // newLimit updates the inflow window to a new value n.
  124. // It assumes that n is always greater than the old limit.
  125. func (f *inFlow) newLimit(n uint32) uint32 {
  126. f.mu.Lock()
  127. d := n - f.limit
  128. f.limit = n
  129. f.mu.Unlock()
  130. return d
  131. }
  132. func (f *inFlow) maybeAdjust(n uint32) uint32 {
  133. if n > uint32(math.MaxInt32) {
  134. n = uint32(math.MaxInt32)
  135. }
  136. f.mu.Lock()
  137. defer f.mu.Unlock()
  138. // estSenderQuota is the receiver's view of the maximum number of bytes the sender
  139. // can send without a window update.
  140. estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
  141. // estUntransmittedData is the maximum number of bytes the sends might not have put
  142. // on the wire yet. A value of 0 or less means that we have already received all or
  143. // more bytes than the application is requesting to read.
  144. estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
  145. // This implies that unless we send a window update, the sender won't be able to send all the bytes
  146. // for this message. Therefore we must send an update over the limit since there's an active read
  147. // request from the application.
  148. if estUntransmittedData > estSenderQuota {
  149. // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
  150. if f.limit+n > maxWindowSize {
  151. f.delta = maxWindowSize - f.limit
  152. } else {
  153. // Send a window update for the whole message and not just the difference between
  154. // estUntransmittedData and estSenderQuota. This will be helpful in case the message
  155. // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
  156. f.delta = n
  157. }
  158. return f.delta
  159. }
  160. return 0
  161. }
  162. // onData is invoked when some data frame is received. It updates pendingData.
  163. func (f *inFlow) onData(n uint32) error {
  164. f.mu.Lock()
  165. f.pendingData += n
  166. if f.pendingData+f.pendingUpdate > f.limit+f.delta {
  167. limit := f.limit
  168. rcvd := f.pendingData + f.pendingUpdate
  169. f.mu.Unlock()
  170. return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
  171. }
  172. f.mu.Unlock()
  173. return nil
  174. }
  175. // onRead is invoked when the application reads the data. It returns the window size
  176. // to be sent to the peer.
  177. func (f *inFlow) onRead(n uint32) uint32 {
  178. f.mu.Lock()
  179. if f.pendingData == 0 {
  180. f.mu.Unlock()
  181. return 0
  182. }
  183. f.pendingData -= n
  184. if n > f.delta {
  185. n -= f.delta
  186. f.delta = 0
  187. } else {
  188. f.delta -= n
  189. n = 0
  190. }
  191. f.pendingUpdate += n
  192. if f.pendingUpdate >= f.limit/4 {
  193. wu := f.pendingUpdate
  194. f.pendingUpdate = 0
  195. f.mu.Unlock()
  196. return wu
  197. }
  198. f.mu.Unlock()
  199. return 0
  200. }