writesched.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. import "fmt"
  9. // frameWriteMsg is a request to write a frame.
  10. type frameWriteMsg struct {
  11. // write is the interface value that does the writing, once the
  12. // writeScheduler (below) has decided to select this frame
  13. // to write. The write functions are all defined in write.go.
  14. write writeFramer
  15. stream *stream // used for prioritization. nil for non-stream frames.
  16. // done, if non-nil, must be a buffered channel with space for
  17. // 1 message and is sent the return value from write (or an
  18. // earlier error) when the frame has been written.
  19. done chan error
  20. }
  21. // for debugging only:
  22. func (wm frameWriteMsg) String() string {
  23. var streamID uint32
  24. if wm.stream != nil {
  25. streamID = wm.stream.id
  26. }
  27. var des string
  28. if s, ok := wm.write.(fmt.Stringer); ok {
  29. des = s.String()
  30. } else {
  31. des = fmt.Sprintf("%T", wm.write)
  32. }
  33. return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
  34. }
  35. // writeScheduler tracks pending frames to write, priorities, and decides
  36. // the next one to use. It is not thread-safe.
  37. type writeScheduler struct {
  38. // zero are frames not associated with a specific stream.
  39. // They're sent before any stream-specific freams.
  40. zero writeQueue
  41. // maxFrameSize is the maximum size of a DATA frame
  42. // we'll write. Must be non-zero and between 16K-16M.
  43. maxFrameSize uint32
  44. // sq contains the stream-specific queues, keyed by stream ID.
  45. // when a stream is idle, it's deleted from the map.
  46. sq map[uint32]*writeQueue
  47. // canSend is a slice of memory that's reused between frame
  48. // scheduling decisions to hold the list of writeQueues (from sq)
  49. // which have enough flow control data to send. After canSend is
  50. // built, the best is selected.
  51. canSend []*writeQueue
  52. // pool of empty queues for reuse.
  53. queuePool []*writeQueue
  54. }
  55. func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
  56. if len(q.s) != 0 {
  57. panic("queue must be empty")
  58. }
  59. ws.queuePool = append(ws.queuePool, q)
  60. }
  61. func (ws *writeScheduler) getEmptyQueue() *writeQueue {
  62. ln := len(ws.queuePool)
  63. if ln == 0 {
  64. return new(writeQueue)
  65. }
  66. q := ws.queuePool[ln-1]
  67. ws.queuePool = ws.queuePool[:ln-1]
  68. return q
  69. }
  70. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  71. func (ws *writeScheduler) add(wm frameWriteMsg) {
  72. st := wm.stream
  73. if st == nil {
  74. ws.zero.push(wm)
  75. } else {
  76. ws.streamQueue(st.id).push(wm)
  77. }
  78. }
  79. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  80. if q, ok := ws.sq[streamID]; ok {
  81. return q
  82. }
  83. if ws.sq == nil {
  84. ws.sq = make(map[uint32]*writeQueue)
  85. }
  86. q := ws.getEmptyQueue()
  87. ws.sq[streamID] = q
  88. return q
  89. }
  90. // take returns the most important frame to write and removes it from the scheduler.
  91. // It is illegal to call this if the scheduler is empty or if there are no connection-level
  92. // flow control bytes available.
  93. func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
  94. if ws.maxFrameSize == 0 {
  95. panic("internal error: ws.maxFrameSize not initialized or invalid")
  96. }
  97. // If there any frames not associated with streams, prefer those first.
  98. // These are usually SETTINGS, etc.
  99. if !ws.zero.empty() {
  100. return ws.zero.shift(), true
  101. }
  102. if len(ws.sq) == 0 {
  103. return
  104. }
  105. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  106. for id, q := range ws.sq {
  107. if q.firstIsNoCost() {
  108. return ws.takeFrom(id, q)
  109. }
  110. }
  111. // Now, all that remains are DATA frames with non-zero bytes to
  112. // send. So pick the best one.
  113. if len(ws.canSend) != 0 {
  114. panic("should be empty")
  115. }
  116. for _, q := range ws.sq {
  117. if n := ws.streamWritableBytes(q); n > 0 {
  118. ws.canSend = append(ws.canSend, q)
  119. }
  120. }
  121. if len(ws.canSend) == 0 {
  122. return
  123. }
  124. defer ws.zeroCanSend()
  125. // TODO: find the best queue
  126. q := ws.canSend[0]
  127. return ws.takeFrom(q.streamID(), q)
  128. }
  129. // zeroCanSend is defered from take.
  130. func (ws *writeScheduler) zeroCanSend() {
  131. for i := range ws.canSend {
  132. ws.canSend[i] = nil
  133. }
  134. ws.canSend = ws.canSend[:0]
  135. }
  136. // streamWritableBytes returns the number of DATA bytes we could write
  137. // from the given queue's stream, if this stream/queue were
  138. // selected. It is an error to call this if q's head isn't a
  139. // *writeData.
  140. func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
  141. wm := q.head()
  142. ret := wm.stream.flow.available() // max we can write
  143. if ret == 0 {
  144. return 0
  145. }
  146. if int32(ws.maxFrameSize) < ret {
  147. ret = int32(ws.maxFrameSize)
  148. }
  149. if ret == 0 {
  150. panic("internal error: ws.maxFrameSize not initialized or invalid")
  151. }
  152. wd := wm.write.(*writeData)
  153. if len(wd.p) < int(ret) {
  154. ret = int32(len(wd.p))
  155. }
  156. return ret
  157. }
  158. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
  159. wm = q.head()
  160. // If the first item in this queue costs flow control tokens
  161. // and we don't have enough, write as much as we can.
  162. if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
  163. allowed := wm.stream.flow.available() // max we can write
  164. if allowed == 0 {
  165. // No quota available. Caller can try the next stream.
  166. return frameWriteMsg{}, false
  167. }
  168. if int32(ws.maxFrameSize) < allowed {
  169. allowed = int32(ws.maxFrameSize)
  170. }
  171. // TODO: further restrict the allowed size, because even if
  172. // the peer says it's okay to write 16MB data frames, we might
  173. // want to write smaller ones to properly weight competing
  174. // streams' priorities.
  175. if len(wd.p) > int(allowed) {
  176. wm.stream.flow.take(allowed)
  177. chunk := wd.p[:allowed]
  178. wd.p = wd.p[allowed:]
  179. // Make up a new write message of a valid size, rather
  180. // than shifting one off the queue.
  181. return frameWriteMsg{
  182. stream: wm.stream,
  183. write: &writeData{
  184. streamID: wd.streamID,
  185. p: chunk,
  186. // even if the original had endStream set, there
  187. // arebytes remaining because len(wd.p) > allowed,
  188. // so we know endStream is false:
  189. endStream: false,
  190. },
  191. // our caller is blocking on the final DATA frame, not
  192. // these intermediates, so no need to wait:
  193. done: nil,
  194. }, true
  195. }
  196. wm.stream.flow.take(int32(len(wd.p)))
  197. }
  198. q.shift()
  199. if q.empty() {
  200. ws.putEmptyQueue(q)
  201. delete(ws.sq, id)
  202. }
  203. return wm, true
  204. }
  205. func (ws *writeScheduler) forgetStream(id uint32) {
  206. q, ok := ws.sq[id]
  207. if !ok {
  208. return
  209. }
  210. delete(ws.sq, id)
  211. // But keep it for others later.
  212. for i := range q.s {
  213. q.s[i] = frameWriteMsg{}
  214. }
  215. q.s = q.s[:0]
  216. ws.putEmptyQueue(q)
  217. }
  218. type writeQueue struct {
  219. s []frameWriteMsg
  220. }
  221. // streamID returns the stream ID for a non-empty stream-specific queue.
  222. func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
  223. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  224. func (q *writeQueue) push(wm frameWriteMsg) {
  225. q.s = append(q.s, wm)
  226. }
  227. // head returns the next item that would be removed by shift.
  228. func (q *writeQueue) head() frameWriteMsg {
  229. if len(q.s) == 0 {
  230. panic("invalid use of queue")
  231. }
  232. return q.s[0]
  233. }
  234. func (q *writeQueue) shift() frameWriteMsg {
  235. if len(q.s) == 0 {
  236. panic("invalid use of queue")
  237. }
  238. wm := q.s[0]
  239. // TODO: less copy-happy queue.
  240. copy(q.s, q.s[1:])
  241. q.s[len(q.s)-1] = frameWriteMsg{}
  242. q.s = q.s[:len(q.s)-1]
  243. return wm
  244. }
  245. func (q *writeQueue) firstIsNoCost() bool {
  246. if df, ok := q.s[0].write.(*writeData); ok {
  247. return len(df.p) == 0
  248. }
  249. return true
  250. }