writesched.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. import "fmt"
  9. // frameWriteMsg is a request to write a frame.
  10. type frameWriteMsg struct {
  11. // write is the interface value that does the writing, once the
  12. // writeScheduler (below) has decided to select this frame
  13. // to write. The write functions are all defined in write.go.
  14. write writeFramer
  15. stream *stream // used for prioritization. nil for non-stream frames.
  16. // done, if non-nil, must be a buffered channel with space for
  17. // 1 message and is sent the return value from write (or an
  18. // earlier error) when the frame has been written.
  19. done chan error
  20. }
  21. // for debugging only:
  22. func (wm frameWriteMsg) String() string {
  23. var streamID uint32
  24. if wm.stream != nil {
  25. streamID = wm.stream.id
  26. }
  27. var des string
  28. if s, ok := wm.write.(fmt.Stringer); ok {
  29. des = s.String()
  30. } else {
  31. des = fmt.Sprintf("%T", wm.write)
  32. }
  33. return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
  34. }
  35. // writeScheduler tracks pending frames to write, priorities, and decides
  36. // the next one to use. It is not thread-safe.
  37. type writeScheduler struct {
  38. // zero are frames not associated with a specific stream.
  39. // They're sent before any stream-specific freams.
  40. zero writeQueue
  41. // maxFrameSize is the maximum size of a DATA frame
  42. // we'll write. Must be non-zero and between 16K-16M.
  43. maxFrameSize uint32
  44. // sq contains the stream-specific queues, keyed by stream ID.
  45. // when a stream is idle, it's deleted from the map.
  46. sq map[uint32]*writeQueue
  47. // canSend is a slice of memory that's reused between frame
  48. // scheduling decisions to hold the list of writeQueues (from sq)
  49. // which have enough flow control data to send. After canSend is
  50. // built, the best is selected.
  51. canSend []*writeQueue
  52. }
  53. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  54. func (ws *writeScheduler) add(wm frameWriteMsg) {
  55. st := wm.stream
  56. if st == nil {
  57. ws.zero.push(wm)
  58. } else {
  59. ws.streamQueue(st.id).push(wm)
  60. }
  61. }
  62. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  63. if q, ok := ws.sq[streamID]; ok {
  64. return q
  65. }
  66. if ws.sq == nil {
  67. ws.sq = make(map[uint32]*writeQueue)
  68. }
  69. q := new(writeQueue)
  70. ws.sq[streamID] = q
  71. return q
  72. }
  73. // take returns the most important frame to write and removes it from the scheduler.
  74. // It is illegal to call this if the scheduler is empty or if there are no connection-level
  75. // flow control bytes available.
  76. func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
  77. // If there any frames not associated with streams, prefer those first.
  78. // These are usually SETTINGS, etc.
  79. if !ws.zero.empty() {
  80. return ws.zero.shift(), true
  81. }
  82. if len(ws.sq) == 0 {
  83. return
  84. }
  85. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  86. for id, q := range ws.sq {
  87. if q.firstIsNoCost() {
  88. return ws.takeFrom(id, q)
  89. }
  90. }
  91. // Now, all that remains are DATA frames with non-zero bytes to
  92. // send. So pick the best one.
  93. if len(ws.canSend) != 0 {
  94. panic("should be empty")
  95. }
  96. for _, q := range ws.sq {
  97. if n := ws.streamWritableBytes(q); n > 0 {
  98. ws.canSend = append(ws.canSend, q)
  99. }
  100. }
  101. if len(ws.canSend) == 0 {
  102. return
  103. }
  104. defer ws.zeroCanSend()
  105. // TODO: find the best queue
  106. q := ws.canSend[0]
  107. return ws.takeFrom(q.streamID(), q)
  108. }
  109. // zeroCanSend is defered from take.
  110. func (ws *writeScheduler) zeroCanSend() {
  111. for i := range ws.canSend {
  112. ws.canSend[i] = nil
  113. }
  114. ws.canSend = ws.canSend[:0]
  115. }
  116. // streamWritableBytes returns the number of DATA bytes we could write
  117. // from the given queue's stream, if this stream/queue were
  118. // selected. It is an error to call this if q's head isn't a
  119. // *writeData.
  120. func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
  121. wm := q.head()
  122. ret := wm.stream.flow.available() // max we can write
  123. if ret == 0 {
  124. return 0
  125. }
  126. if int32(ws.maxFrameSize) < ret {
  127. ret = int32(ws.maxFrameSize)
  128. }
  129. if ret == 0 {
  130. panic("internal error: ws.maxFrameSize not initialized or invalid")
  131. }
  132. wd := wm.write.(*writeData)
  133. if len(wd.p) < int(ret) {
  134. ret = int32(len(wd.p))
  135. }
  136. return ret
  137. }
  138. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
  139. wm = q.head()
  140. // If the first item in this queue costs flow control tokens
  141. // and we don't have enough, write as much as we can.
  142. if wd, ok := wm.write.(*writeData); ok {
  143. allowed := wm.stream.flow.available() // max we can write
  144. // We can write 0 byte DATA frame (which usually bears
  145. // END_STREAM, i.e., last DATA frame) even if allowed
  146. // == 0.
  147. if len(wd.p) > 0 && allowed == 0 {
  148. // No quota available. Caller can try the next stream.
  149. return frameWriteMsg{}, false
  150. }
  151. if int32(ws.maxFrameSize) < allowed {
  152. allowed = int32(ws.maxFrameSize)
  153. }
  154. if len(wd.p) > 0 && allowed == 0 {
  155. panic("internal error: ws.maxFrameSize not initialized or invalid")
  156. }
  157. // TODO: further restrict the allowed size, because even if
  158. // the peer says it's okay to write 16MB data frames, we might
  159. // want to write smaller ones to properly weight competing
  160. // streams' priorities.
  161. if len(wd.p) > int(allowed) {
  162. wm.stream.flow.take(allowed)
  163. chunk := wd.p[:allowed]
  164. wd.p = wd.p[allowed:]
  165. // Make up a new write message of a valid size, rather
  166. // than shifting one off the queue.
  167. return frameWriteMsg{
  168. stream: wm.stream,
  169. write: &writeData{
  170. streamID: wd.streamID,
  171. p: chunk,
  172. // even if the original was true, there are bytes
  173. // remaining because len(wd.p) > allowed, so we
  174. // know endStream is false:
  175. endStream: false,
  176. },
  177. // completeness. our caller is blocking on the final
  178. // DATA frame, not these intermediates:
  179. done: nil,
  180. }, true
  181. }
  182. wm.stream.flow.take(int32(len(wd.p)))
  183. }
  184. q.shift()
  185. if q.empty() {
  186. // TODO: reclaim its slice and use it for future allocations
  187. // in the writeScheduler.streamQueue method above when making
  188. // the writeQueue.
  189. delete(ws.sq, id)
  190. }
  191. return wm, true
  192. }
  193. type writeQueue struct {
  194. s []frameWriteMsg
  195. }
  196. // streamID returns the stream ID for a non-empty stream-specific queue.
  197. func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
  198. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  199. func (q *writeQueue) push(wm frameWriteMsg) {
  200. q.s = append(q.s, wm)
  201. }
  202. // head returns the next item that would be removed by shift.
  203. func (q *writeQueue) head() frameWriteMsg {
  204. if len(q.s) == 0 {
  205. panic("invalid use of queue")
  206. }
  207. return q.s[0]
  208. }
  209. func (q *writeQueue) shift() frameWriteMsg {
  210. if len(q.s) == 0 {
  211. panic("invalid use of queue")
  212. }
  213. wm := q.s[0]
  214. // TODO: less copy-happy queue.
  215. copy(q.s, q.s[1:])
  216. q.s[len(q.s)-1] = frameWriteMsg{}
  217. q.s = q.s[:len(q.s)-1]
  218. return wm
  219. }
  220. func (q *writeQueue) firstIsNoCost() bool {
  221. if df, ok := q.s[0].write.(*writeData); ok {
  222. return len(df.p) == 0
  223. }
  224. return true
  225. }