writesched.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import "fmt"
  6. // frameWriteMsg is a request to write a frame.
  7. type frameWriteMsg struct {
  8. // write is the interface value that does the writing, once the
  9. // writeScheduler (below) has decided to select this frame
  10. // to write. The write functions are all defined in write.go.
  11. write writeFramer
  12. stream *stream // used for prioritization. nil for non-stream frames.
  13. // done, if non-nil, must be a buffered channel with space for
  14. // 1 message and is sent the return value from write (or an
  15. // earlier error) when the frame has been written.
  16. done chan error
  17. }
  18. // for debugging only:
  19. func (wm frameWriteMsg) String() string {
  20. var streamID uint32
  21. if wm.stream != nil {
  22. streamID = wm.stream.id
  23. }
  24. var des string
  25. if s, ok := wm.write.(fmt.Stringer); ok {
  26. des = s.String()
  27. } else {
  28. des = fmt.Sprintf("%T", wm.write)
  29. }
  30. return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
  31. }
  32. // writeScheduler tracks pending frames to write, priorities, and decides
  33. // the next one to use. It is not thread-safe.
  34. type writeScheduler struct {
  35. // zero are frames not associated with a specific stream.
  36. // They're sent before any stream-specific freams.
  37. zero writeQueue
  38. // maxFrameSize is the maximum size of a DATA frame
  39. // we'll write. Must be non-zero and between 16K-16M.
  40. maxFrameSize uint32
  41. // sq contains the stream-specific queues, keyed by stream ID.
  42. // when a stream is idle, it's deleted from the map.
  43. sq map[uint32]*writeQueue
  44. // canSend is a slice of memory that's reused between frame
  45. // scheduling decisions to hold the list of writeQueues (from sq)
  46. // which have enough flow control data to send. After canSend is
  47. // built, the best is selected.
  48. canSend []*writeQueue
  49. // pool of empty queues for reuse.
  50. queuePool []*writeQueue
  51. }
  52. func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
  53. if len(q.s) != 0 {
  54. panic("queue must be empty")
  55. }
  56. ws.queuePool = append(ws.queuePool, q)
  57. }
  58. func (ws *writeScheduler) getEmptyQueue() *writeQueue {
  59. ln := len(ws.queuePool)
  60. if ln == 0 {
  61. return new(writeQueue)
  62. }
  63. q := ws.queuePool[ln-1]
  64. ws.queuePool = ws.queuePool[:ln-1]
  65. return q
  66. }
  67. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  68. func (ws *writeScheduler) add(wm frameWriteMsg) {
  69. st := wm.stream
  70. if st == nil {
  71. ws.zero.push(wm)
  72. } else {
  73. ws.streamQueue(st.id).push(wm)
  74. }
  75. }
  76. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  77. if q, ok := ws.sq[streamID]; ok {
  78. return q
  79. }
  80. if ws.sq == nil {
  81. ws.sq = make(map[uint32]*writeQueue)
  82. }
  83. q := ws.getEmptyQueue()
  84. ws.sq[streamID] = q
  85. return q
  86. }
  87. // take returns the most important frame to write and removes it from the scheduler.
  88. // It is illegal to call this if the scheduler is empty or if there are no connection-level
  89. // flow control bytes available.
  90. func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
  91. if ws.maxFrameSize == 0 {
  92. panic("internal error: ws.maxFrameSize not initialized or invalid")
  93. }
  94. // If there any frames not associated with streams, prefer those first.
  95. // These are usually SETTINGS, etc.
  96. if !ws.zero.empty() {
  97. return ws.zero.shift(), true
  98. }
  99. if len(ws.sq) == 0 {
  100. return
  101. }
  102. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  103. for id, q := range ws.sq {
  104. if q.firstIsNoCost() {
  105. return ws.takeFrom(id, q)
  106. }
  107. }
  108. // Now, all that remains are DATA frames with non-zero bytes to
  109. // send. So pick the best one.
  110. if len(ws.canSend) != 0 {
  111. panic("should be empty")
  112. }
  113. for _, q := range ws.sq {
  114. if n := ws.streamWritableBytes(q); n > 0 {
  115. ws.canSend = append(ws.canSend, q)
  116. }
  117. }
  118. if len(ws.canSend) == 0 {
  119. return
  120. }
  121. defer ws.zeroCanSend()
  122. // TODO: find the best queue
  123. q := ws.canSend[0]
  124. return ws.takeFrom(q.streamID(), q)
  125. }
  126. // zeroCanSend is defered from take.
  127. func (ws *writeScheduler) zeroCanSend() {
  128. for i := range ws.canSend {
  129. ws.canSend[i] = nil
  130. }
  131. ws.canSend = ws.canSend[:0]
  132. }
  133. // streamWritableBytes returns the number of DATA bytes we could write
  134. // from the given queue's stream, if this stream/queue were
  135. // selected. It is an error to call this if q's head isn't a
  136. // *writeData.
  137. func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
  138. wm := q.head()
  139. ret := wm.stream.flow.available() // max we can write
  140. if ret == 0 {
  141. return 0
  142. }
  143. if int32(ws.maxFrameSize) < ret {
  144. ret = int32(ws.maxFrameSize)
  145. }
  146. if ret == 0 {
  147. panic("internal error: ws.maxFrameSize not initialized or invalid")
  148. }
  149. wd := wm.write.(*writeData)
  150. if len(wd.p) < int(ret) {
  151. ret = int32(len(wd.p))
  152. }
  153. return ret
  154. }
  155. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
  156. wm = q.head()
  157. // If the first item in this queue costs flow control tokens
  158. // and we don't have enough, write as much as we can.
  159. if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
  160. allowed := wm.stream.flow.available() // max we can write
  161. if allowed == 0 {
  162. // No quota available. Caller can try the next stream.
  163. return frameWriteMsg{}, false
  164. }
  165. if int32(ws.maxFrameSize) < allowed {
  166. allowed = int32(ws.maxFrameSize)
  167. }
  168. // TODO: further restrict the allowed size, because even if
  169. // the peer says it's okay to write 16MB data frames, we might
  170. // want to write smaller ones to properly weight competing
  171. // streams' priorities.
  172. if len(wd.p) > int(allowed) {
  173. wm.stream.flow.take(allowed)
  174. chunk := wd.p[:allowed]
  175. wd.p = wd.p[allowed:]
  176. // Make up a new write message of a valid size, rather
  177. // than shifting one off the queue.
  178. return frameWriteMsg{
  179. stream: wm.stream,
  180. write: &writeData{
  181. streamID: wd.streamID,
  182. p: chunk,
  183. // even if the original had endStream set, there
  184. // arebytes remaining because len(wd.p) > allowed,
  185. // so we know endStream is false:
  186. endStream: false,
  187. },
  188. // our caller is blocking on the final DATA frame, not
  189. // these intermediates, so no need to wait:
  190. done: nil,
  191. }, true
  192. }
  193. wm.stream.flow.take(int32(len(wd.p)))
  194. }
  195. q.shift()
  196. if q.empty() {
  197. ws.putEmptyQueue(q)
  198. delete(ws.sq, id)
  199. }
  200. return wm, true
  201. }
  202. func (ws *writeScheduler) forgetStream(id uint32) {
  203. q, ok := ws.sq[id]
  204. if !ok {
  205. return
  206. }
  207. delete(ws.sq, id)
  208. // But keep it for others later.
  209. for i := range q.s {
  210. q.s[i] = frameWriteMsg{}
  211. }
  212. q.s = q.s[:0]
  213. ws.putEmptyQueue(q)
  214. }
  215. type writeQueue struct {
  216. s []frameWriteMsg
  217. }
  218. // streamID returns the stream ID for a non-empty stream-specific queue.
  219. func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
  220. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  221. func (q *writeQueue) push(wm frameWriteMsg) {
  222. q.s = append(q.s, wm)
  223. }
  224. // head returns the next item that would be removed by shift.
  225. func (q *writeQueue) head() frameWriteMsg {
  226. if len(q.s) == 0 {
  227. panic("invalid use of queue")
  228. }
  229. return q.s[0]
  230. }
  231. func (q *writeQueue) shift() frameWriteMsg {
  232. if len(q.s) == 0 {
  233. panic("invalid use of queue")
  234. }
  235. wm := q.s[0]
  236. // TODO: less copy-happy queue.
  237. copy(q.s, q.s[1:])
  238. q.s[len(q.s)-1] = frameWriteMsg{}
  239. q.s = q.s[:len(q.s)-1]
  240. return wm
  241. }
  242. func (q *writeQueue) firstIsNoCost() bool {
  243. if df, ok := q.s[0].write.(*writeData); ok {
  244. return len(df.p) == 0
  245. }
  246. return true
  247. }