writesched.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. // frameWriteMsg is a request to write a frame.
  9. type frameWriteMsg struct {
  10. // write is the interface value that does the writing, once the
  11. // writeScheduler (below) has decided to select this frame
  12. // to write. The write functions are all defined in write.go.
  13. write writeFramer
  14. stream *stream // used for prioritization. nil for non-stream frames.
  15. // done, if non-nil, must be a buffered channel with space for
  16. // 1 message and is sent the return value from write (or an
  17. // earlier error) when the frame has been written.
  18. done chan error
  19. }
  20. // writeScheduler tracks pending frames to write, priorities, and decides
  21. // the next one to use. It is not thread-safe.
  22. type writeScheduler struct {
  23. // zero are frames not associated with a specific stream.
  24. // They're sent before any stream-specific freams.
  25. zero writeQueue
  26. // maxFrameSize is the maximum size of a DATA frame
  27. // we'll write. Must be non-zero and between 16K-16M.
  28. maxFrameSize uint32
  29. // sq contains the stream-specific queues, keyed by stream ID.
  30. // when a stream is idle, it's deleted from the map.
  31. sq map[uint32]*writeQueue
  32. // canSend is a slice of memory that's reused between frame
  33. // scheduling decisions to hold the list of writeQueues (from sq)
  34. // which have enough flow control data to send. After canSend is
  35. // built, the best is selected.
  36. canSend []*writeQueue
  37. }
  38. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  39. func (ws *writeScheduler) add(wm frameWriteMsg) {
  40. st := wm.stream
  41. if st == nil {
  42. ws.zero.push(wm)
  43. } else {
  44. ws.streamQueue(st.id).push(wm)
  45. }
  46. }
  47. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  48. if q, ok := ws.sq[streamID]; ok {
  49. return q
  50. }
  51. if ws.sq == nil {
  52. ws.sq = make(map[uint32]*writeQueue)
  53. }
  54. q := new(writeQueue)
  55. ws.sq[streamID] = q
  56. return q
  57. }
  58. // take returns the most important frame to write and removes it from the scheduler.
  59. // It is illegal to call this if the scheduler is empty or if there are no connection-level
  60. // flow control bytes available.
  61. func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
  62. // If there any frames not associated with streams, prefer those first.
  63. // These are usually SETTINGS, etc.
  64. if !ws.zero.empty() {
  65. return ws.zero.shift(), true
  66. }
  67. if len(ws.sq) == 0 {
  68. return
  69. }
  70. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  71. for id, q := range ws.sq {
  72. if q.firstIsNoCost() {
  73. return ws.takeFrom(id, q)
  74. }
  75. }
  76. // Now, all that remains are DATA frames with non-zero bytes to
  77. // send. So pick the best one.
  78. if len(ws.canSend) != 0 {
  79. panic("should be empty")
  80. }
  81. for _, q := range ws.sq {
  82. if n := ws.streamWritableBytes(q); n > 0 {
  83. ws.canSend = append(ws.canSend, q)
  84. }
  85. }
  86. if len(ws.canSend) == 0 {
  87. return
  88. }
  89. defer ws.zeroCanSend()
  90. // TODO: find the best queue
  91. q := ws.canSend[0]
  92. return ws.takeFrom(q.streamID(), q)
  93. }
  94. // zeroCanSend is defered from take.
  95. func (ws *writeScheduler) zeroCanSend() {
  96. for i := range ws.canSend {
  97. ws.canSend[i] = nil
  98. }
  99. ws.canSend = ws.canSend[:0]
  100. }
  101. // streamWritableBytes returns the number of DATA bytes we could write
  102. // from the given queue's stream, if this stream/queue were
  103. // selected. It is an error to call this if q's head isn't a
  104. // *writeData.
  105. func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
  106. wm := q.head()
  107. ret := wm.stream.flow.available() // max we can write
  108. if ret == 0 {
  109. return 0
  110. }
  111. if int32(ws.maxFrameSize) < ret {
  112. ret = int32(ws.maxFrameSize)
  113. }
  114. if ret == 0 {
  115. panic("internal error: ws.maxFrameSize not initialized or invalid")
  116. }
  117. wd := wm.write.(*writeData)
  118. if len(wd.p) < int(ret) {
  119. ret = int32(len(wd.p))
  120. }
  121. return ret
  122. }
  123. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
  124. wm = q.head()
  125. // If the first item in this queue costs flow control tokens
  126. // and we don't have enough, write as much as we can.
  127. if wd, ok := wm.write.(*writeData); ok {
  128. allowed := wm.stream.flow.available() // max we can write
  129. // We can write 0 byte DATA frame (which usually bears
  130. // END_STREAM, i.e., last DATA frame) even if allowed
  131. // == 0.
  132. if len(wd.p) > 0 && allowed == 0 {
  133. // No quota available. Caller can try the next stream.
  134. return frameWriteMsg{}, false
  135. }
  136. if int32(ws.maxFrameSize) < allowed {
  137. allowed = int32(ws.maxFrameSize)
  138. }
  139. if len(wd.p) > 0 && allowed == 0 {
  140. panic("internal error: ws.maxFrameSize not initialized or invalid")
  141. }
  142. // TODO: further restrict the allowed size, because even if
  143. // the peer says it's okay to write 16MB data frames, we might
  144. // want to write smaller ones to properly weight competing
  145. // streams' priorities.
  146. if len(wd.p) > int(allowed) {
  147. wm.stream.flow.take(allowed)
  148. chunk := wd.p[:allowed]
  149. wd.p = wd.p[allowed:]
  150. // Make up a new write message of a valid size, rather
  151. // than shifting one off the queue.
  152. return frameWriteMsg{
  153. stream: wm.stream,
  154. write: &writeData{
  155. streamID: wd.streamID,
  156. p: chunk,
  157. // even if the original was true, there are bytes
  158. // remaining because len(wd.p) > allowed, so we
  159. // know endStream is false:
  160. endStream: false,
  161. },
  162. // completeness. our caller is blocking on the final
  163. // DATA frame, not these intermediates:
  164. done: nil,
  165. }, true
  166. }
  167. wm.stream.flow.take(int32(len(wd.p)))
  168. }
  169. q.shift()
  170. if q.empty() {
  171. // TODO: reclaim its slice and use it for future allocations
  172. // in the writeScheduler.streamQueue method above when making
  173. // the writeQueue.
  174. delete(ws.sq, id)
  175. }
  176. return wm, true
  177. }
  178. type writeQueue struct {
  179. s []frameWriteMsg
  180. }
  181. // streamID returns the stream ID for a non-empty stream-specific queue.
  182. func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
  183. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  184. func (q *writeQueue) push(wm frameWriteMsg) {
  185. q.s = append(q.s, wm)
  186. }
  187. // head returns the next item that would be removed by shift.
  188. func (q *writeQueue) head() frameWriteMsg {
  189. if len(q.s) == 0 {
  190. panic("invalid use of queue")
  191. }
  192. return q.s[0]
  193. }
  194. func (q *writeQueue) shift() frameWriteMsg {
  195. if len(q.s) == 0 {
  196. panic("invalid use of queue")
  197. }
  198. wm := q.s[0]
  199. // TODO: less copy-happy queue.
  200. copy(q.s, q.s[1:])
  201. q.s[len(q.s)-1] = frameWriteMsg{}
  202. q.s = q.s[:len(q.s)-1]
  203. return wm
  204. }
  205. func (q *writeQueue) firstIsNoCost() bool {
  206. if df, ok := q.s[0].write.(*writeData); ok {
  207. return len(df.p) == 0
  208. }
  209. return true
  210. }