writesched.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. // frameWriteMsg is a request to write a frame.
  9. type frameWriteMsg struct {
  10. // write is the interface value that does the writing, once the
  11. // writeScheduler (below) has decided to select this frame
  12. // to write. The write functions are all defined in write.go.
  13. write writeFramer
  14. stream *stream // used for prioritization. nil for non-stream frames.
  15. // done, if non-nil, must be a buffered channel with space for
  16. // 1 message and is sent the return value from write (or an
  17. // earlier error) when the frame has been written.
  18. done chan error
  19. }
  20. // writeScheduler tracks pending frames to write, priorities, and decides
  21. // the next one to use. It is not thread-safe.
  22. type writeScheduler struct {
  23. // zero are frames not associated with a specific stream.
  24. // They're sent before any stream-specific freams.
  25. zero writeQueue
  26. // sq contains the stream-specific queues, keyed by stream ID.
  27. // when a stream is idle, it's deleted from the map.
  28. sq map[uint32]*writeQueue
  29. }
  30. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  31. func (ws *writeScheduler) add(wm frameWriteMsg) {
  32. st := wm.stream
  33. if st == nil {
  34. ws.zero.push(wm)
  35. } else {
  36. ws.streamQueue(st.id).push(wm)
  37. }
  38. }
  39. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  40. if q, ok := ws.sq[streamID]; ok {
  41. return q
  42. }
  43. if ws.sq == nil {
  44. ws.sq = make(map[uint32]*writeQueue)
  45. }
  46. q := new(writeQueue)
  47. ws.sq[streamID] = q
  48. return q
  49. }
  50. // take returns the most important frame to write and removes it from the scheduler.
  51. // It is illegal to call this if the scheduler is empty.
  52. func (ws *writeScheduler) take() frameWriteMsg {
  53. // If there any frames not associated with streams, prefer those first.
  54. // These are usually SETTINGS, etc.
  55. if !ws.zero.empty() {
  56. return ws.zero.shift()
  57. }
  58. if len(ws.sq) == 0 {
  59. panic("internal error: take should only be called if non-empty")
  60. }
  61. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  62. for id, q := range ws.sq {
  63. if q.firstIsNoCost() {
  64. return ws.takeFrom(id, q)
  65. }
  66. }
  67. // Now, all that remains are DATA frames. So pick the best one.
  68. // TODO: do that. For now, pick a random one.
  69. for id, q := range ws.sq {
  70. return ws.takeFrom(id, q)
  71. }
  72. panic("internal error: take should only be called if non-empty")
  73. }
  74. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) frameWriteMsg {
  75. wm := q.shift()
  76. if q.empty() {
  77. // TODO: reclaim its slice and use it for future allocations
  78. // in the writeScheduler.streamQueue method above when making
  79. // the writeQueue.
  80. delete(ws.sq, id)
  81. }
  82. return wm
  83. }
  84. type writeQueue struct {
  85. s []frameWriteMsg
  86. }
  87. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  88. func (q *writeQueue) push(wm frameWriteMsg) {
  89. q.s = append(q.s, wm)
  90. }
  91. func (q *writeQueue) shift() frameWriteMsg {
  92. if len(q.s) == 0 {
  93. panic("invalid use of queue")
  94. }
  95. wm := q.s[0]
  96. // TODO: less copy-happy queue.
  97. copy(q.s, q.s[1:])
  98. q.s[len(q.s)-1] = frameWriteMsg{}
  99. q.s = q.s[:len(q.s)-1]
  100. return wm
  101. }
  102. func (q *writeQueue) firstIsNoCost() bool {
  103. if df, ok := q.s[0].write.(*writeData); ok {
  104. return len(df.p) == 0
  105. }
  106. return true
  107. }