control.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "fmt"
  21. "math"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "golang.org/x/net/http2"
  26. "golang.org/x/net/http2/hpack"
  27. )
  28. const (
  29. // The default value of flow control window size in HTTP2 spec.
  30. defaultWindowSize = 65535
  31. // The initial window size for flow control.
  32. initialWindowSize = defaultWindowSize // for an RPC
  33. infinity = time.Duration(math.MaxInt64)
  34. defaultClientKeepaliveTime = infinity
  35. defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
  36. defaultMaxStreamsClient = 100
  37. defaultMaxConnectionIdle = infinity
  38. defaultMaxConnectionAge = infinity
  39. defaultMaxConnectionAgeGrace = infinity
  40. defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
  41. defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
  42. defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
  43. // max window limit set by HTTP2 Specs.
  44. maxWindowSize = math.MaxInt32
  45. // defaultLocalSendQuota sets is default value for number of data
  46. // bytes that each stream can schedule before some of it being
  47. // flushed out.
  48. defaultLocalSendQuota = 64 * 1024
  49. )
  50. // The following defines various control items which could flow through
  51. // the control buffer of transport. They represent different aspects of
  52. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  53. type headerFrame struct {
  54. streamID uint32
  55. hf []hpack.HeaderField
  56. endStream bool
  57. }
  58. func (*headerFrame) item() {}
  59. type continuationFrame struct {
  60. streamID uint32
  61. endHeaders bool
  62. headerBlockFragment []byte
  63. }
  64. type dataFrame struct {
  65. streamID uint32
  66. endStream bool
  67. d []byte
  68. f func()
  69. }
  70. func (*dataFrame) item() {}
  71. func (*continuationFrame) item() {}
  72. type windowUpdate struct {
  73. streamID uint32
  74. increment uint32
  75. }
  76. func (*windowUpdate) item() {}
  77. type settings struct {
  78. ack bool
  79. ss []http2.Setting
  80. }
  81. func (*settings) item() {}
  82. type resetStream struct {
  83. streamID uint32
  84. code http2.ErrCode
  85. }
  86. func (*resetStream) item() {}
  87. type goAway struct {
  88. code http2.ErrCode
  89. debugData []byte
  90. headsUp bool
  91. closeConn bool
  92. }
  93. func (*goAway) item() {}
  94. type flushIO struct {
  95. }
  96. func (*flushIO) item() {}
  97. type ping struct {
  98. ack bool
  99. data [8]byte
  100. }
  101. func (*ping) item() {}
  102. // quotaPool is a pool which accumulates the quota and sends it to acquire()
  103. // when it is available.
  104. type quotaPool struct {
  105. c chan int
  106. mu sync.Mutex
  107. version uint32
  108. quota int
  109. }
  110. // newQuotaPool creates a quotaPool which has quota q available to consume.
  111. func newQuotaPool(q int) *quotaPool {
  112. qb := &quotaPool{
  113. c: make(chan int, 1),
  114. }
  115. if q > 0 {
  116. qb.c <- q
  117. } else {
  118. qb.quota = q
  119. }
  120. return qb
  121. }
  122. // add cancels the pending quota sent on acquired, incremented by v and sends
  123. // it back on acquire.
  124. func (qb *quotaPool) add(v int) {
  125. qb.mu.Lock()
  126. defer qb.mu.Unlock()
  127. qb.lockedAdd(v)
  128. }
  129. func (qb *quotaPool) lockedAdd(v int) {
  130. select {
  131. case n := <-qb.c:
  132. qb.quota += n
  133. default:
  134. }
  135. qb.quota += v
  136. if qb.quota <= 0 {
  137. return
  138. }
  139. // After the pool has been created, this is the only place that sends on
  140. // the channel. Since mu is held at this point and any quota that was sent
  141. // on the channel has been retrieved, we know that this code will always
  142. // place any positive quota value on the channel.
  143. select {
  144. case qb.c <- qb.quota:
  145. qb.quota = 0
  146. default:
  147. }
  148. }
  149. func (qb *quotaPool) addAndUpdate(v int) {
  150. qb.mu.Lock()
  151. defer qb.mu.Unlock()
  152. qb.lockedAdd(v)
  153. // Update the version only after having added to the quota
  154. // so that if acquireWithVesrion sees the new vesrion it is
  155. // guaranteed to have seen the updated quota.
  156. // Also, still keep this inside of the lock, so that when
  157. // compareAndExecute is processing, this function doesn't
  158. // get executed partially (quota gets updated but the version
  159. // doesn't).
  160. atomic.AddUint32(&(qb.version), 1)
  161. }
  162. func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
  163. return qb.c, atomic.LoadUint32(&(qb.version))
  164. }
  165. func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
  166. qb.mu.Lock()
  167. defer qb.mu.Unlock()
  168. if version == atomic.LoadUint32(&(qb.version)) {
  169. success()
  170. return true
  171. }
  172. failure()
  173. return false
  174. }
  175. // acquire returns the channel on which available quota amounts are sent.
  176. func (qb *quotaPool) acquire() <-chan int {
  177. return qb.c
  178. }
  179. // inFlow deals with inbound flow control
  180. type inFlow struct {
  181. mu sync.Mutex
  182. // The inbound flow control limit for pending data.
  183. limit uint32
  184. // pendingData is the overall data which have been received but not been
  185. // consumed by applications.
  186. pendingData uint32
  187. // The amount of data the application has consumed but grpc has not sent
  188. // window update for them. Used to reduce window update frequency.
  189. pendingUpdate uint32
  190. // delta is the extra window update given by receiver when an application
  191. // is reading data bigger in size than the inFlow limit.
  192. delta uint32
  193. }
  194. // newLimit updates the inflow window to a new value n.
  195. // It assumes that n is always greater than the old limit.
  196. func (f *inFlow) newLimit(n uint32) uint32 {
  197. f.mu.Lock()
  198. defer f.mu.Unlock()
  199. d := n - f.limit
  200. f.limit = n
  201. return d
  202. }
  203. func (f *inFlow) maybeAdjust(n uint32) uint32 {
  204. if n > uint32(math.MaxInt32) {
  205. n = uint32(math.MaxInt32)
  206. }
  207. f.mu.Lock()
  208. defer f.mu.Unlock()
  209. // estSenderQuota is the receiver's view of the maximum number of bytes the sender
  210. // can send without a window update.
  211. estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
  212. // estUntransmittedData is the maximum number of bytes the sends might not have put
  213. // on the wire yet. A value of 0 or less means that we have already received all or
  214. // more bytes than the application is requesting to read.
  215. estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
  216. // This implies that unless we send a window update, the sender won't be able to send all the bytes
  217. // for this message. Therefore we must send an update over the limit since there's an active read
  218. // request from the application.
  219. if estUntransmittedData > estSenderQuota {
  220. // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
  221. if f.limit+n > maxWindowSize {
  222. f.delta = maxWindowSize - f.limit
  223. } else {
  224. // Send a window update for the whole message and not just the difference between
  225. // estUntransmittedData and estSenderQuota. This will be helpful in case the message
  226. // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
  227. f.delta = n
  228. }
  229. return f.delta
  230. }
  231. return 0
  232. }
  233. // onData is invoked when some data frame is received. It updates pendingData.
  234. func (f *inFlow) onData(n uint32) error {
  235. f.mu.Lock()
  236. defer f.mu.Unlock()
  237. f.pendingData += n
  238. if f.pendingData+f.pendingUpdate > f.limit+f.delta {
  239. return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
  240. }
  241. return nil
  242. }
  243. // onRead is invoked when the application reads the data. It returns the window size
  244. // to be sent to the peer.
  245. func (f *inFlow) onRead(n uint32) uint32 {
  246. f.mu.Lock()
  247. defer f.mu.Unlock()
  248. if f.pendingData == 0 {
  249. return 0
  250. }
  251. f.pendingData -= n
  252. if n > f.delta {
  253. n -= f.delta
  254. f.delta = 0
  255. } else {
  256. f.delta -= n
  257. n = 0
  258. }
  259. f.pendingUpdate += n
  260. if f.pendingUpdate >= f.limit/4 {
  261. wu := f.pendingUpdate
  262. f.pendingUpdate = 0
  263. return wu
  264. }
  265. return 0
  266. }
  267. func (f *inFlow) resetPendingUpdate() uint32 {
  268. f.mu.Lock()
  269. defer f.mu.Unlock()
  270. n := f.pendingUpdate
  271. f.pendingUpdate = 0
  272. return n
  273. }