writesched.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. // frameWriteMsg is a request to write a frame.
  9. type frameWriteMsg struct {
  10. // write is the interface value that does the writing, once the
  11. // writeScheduler (below) has decided to select this frame
  12. // to write. The write functions are all defined in write.go.
  13. write writeFramer
  14. stream *stream // used for prioritization. nil for non-stream frames.
  15. // done, if non-nil, must be a buffered channel with space for
  16. // 1 message and is sent the return value from write (or an
  17. // earlier error) when the frame has been written.
  18. done chan error
  19. }
  20. // writeScheduler tracks pending frames to write, priorities, and decides
  21. // the next one to use. It is not thread-safe.
  22. type writeScheduler struct {
  23. // zero are frames not associated with a specific stream.
  24. // They're sent before any stream-specific freams.
  25. zero writeQueue
  26. // maxFrameSize is the maximum size of a DATA frame
  27. // we'll write.
  28. maxFrameSize uint32
  29. // sq contains the stream-specific queues, keyed by stream ID.
  30. // when a stream is idle, it's deleted from the map.
  31. sq map[uint32]*writeQueue
  32. }
  33. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  34. func (ws *writeScheduler) add(wm frameWriteMsg) {
  35. st := wm.stream
  36. if st == nil {
  37. ws.zero.push(wm)
  38. } else {
  39. ws.streamQueue(st.id).push(wm)
  40. }
  41. }
  42. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  43. if q, ok := ws.sq[streamID]; ok {
  44. return q
  45. }
  46. if ws.sq == nil {
  47. ws.sq = make(map[uint32]*writeQueue)
  48. }
  49. q := new(writeQueue)
  50. ws.sq[streamID] = q
  51. return q
  52. }
  53. // take returns the most important frame to write and removes it from the scheduler.
  54. // It is illegal to call this if the scheduler is empty or if there are no connection-level
  55. // flow control bytes available.
  56. func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
  57. // If there any frames not associated with streams, prefer those first.
  58. // These are usually SETTINGS, etc.
  59. if !ws.zero.empty() {
  60. return ws.zero.shift(), true
  61. }
  62. if len(ws.sq) == 0 {
  63. return
  64. }
  65. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  66. for id, q := range ws.sq {
  67. if q.firstIsNoCost() {
  68. return ws.takeFrom(id, q)
  69. }
  70. }
  71. // Now, all that remains are DATA frames. So pick the best one.
  72. // TODO: do that. For now, pick a random one.
  73. for id, q := range ws.sq {
  74. if wm, ok := ws.takeFrom(id, q); ok {
  75. return wm, true
  76. }
  77. }
  78. return
  79. }
  80. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
  81. wm = q.head()
  82. // If the first item in this queue costs flow control tokens
  83. // and we don't have enough, write as much as we can.
  84. if wd, ok := wm.write.(*writeData); ok {
  85. allowed := wm.stream.flow.available() // max we can write
  86. if allowed == 0 {
  87. // No quota available. Caller can try the next stream.
  88. return wm, false
  89. }
  90. if int32(ws.maxFrameSize) < allowed {
  91. allowed = int32(ws.maxFrameSize)
  92. }
  93. if allowed == 0 {
  94. panic("internal error: ws.maxFrameSize not initialized or invalid")
  95. }
  96. // TODO: further restrict the allowed size, because even if
  97. // the peer says it's okay to write 16MB data frames, we might
  98. // want to write smaller ones to properly weight competing
  99. // streams' priorities.
  100. if len(wd.p) > int(allowed) {
  101. wm.stream.flow.take(allowed)
  102. chunk := wd.p[:allowed]
  103. wd.p = wd.p[allowed:]
  104. // Make up a new write message of a valid size, rather
  105. // than shifting one off the queue.
  106. return frameWriteMsg{
  107. stream: wm.stream,
  108. write: &writeData{
  109. streamID: wd.streamID,
  110. p: chunk,
  111. // even if the original was true, there are bytes
  112. // remaining because len(wd.p) > allowed, so we
  113. // know endStream is false:
  114. endStream: false,
  115. },
  116. // completeness. our caller is blocking on the final
  117. // DATA frame, not these intermediates:
  118. done: nil,
  119. }, true
  120. }
  121. }
  122. q.shift()
  123. if q.empty() {
  124. // TODO: reclaim its slice and use it for future allocations
  125. // in the writeScheduler.streamQueue method above when making
  126. // the writeQueue.
  127. delete(ws.sq, id)
  128. }
  129. return wm, true
  130. }
  131. type writeQueue struct {
  132. s []frameWriteMsg
  133. }
  134. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  135. func (q *writeQueue) push(wm frameWriteMsg) {
  136. q.s = append(q.s, wm)
  137. }
  138. // head returns the next item that would be removed by shift.
  139. func (q *writeQueue) head() frameWriteMsg {
  140. if len(q.s) == 0 {
  141. panic("invalid use of queue")
  142. }
  143. return q.s[0]
  144. }
  145. func (q *writeQueue) shift() frameWriteMsg {
  146. if len(q.s) == 0 {
  147. panic("invalid use of queue")
  148. }
  149. wm := q.s[0]
  150. // TODO: less copy-happy queue.
  151. copy(q.s, q.s[1:])
  152. q.s[len(q.s)-1] = frameWriteMsg{}
  153. q.s = q.s[:len(q.s)-1]
  154. return wm
  155. }
  156. func (q *writeQueue) firstIsNoCost() bool {
  157. if df, ok := q.s[0].write.(*writeData); ok {
  158. return len(df.p) == 0
  159. }
  160. return true
  161. }