writesched.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // See https://code.google.com/p/go/source/browse/CONTRIBUTORS
  5. // Licensed under the same terms as Go itself:
  6. // https://code.google.com/p/go/source/browse/LICENSE
  7. package http2
  8. // frameWriteMsg is a request to write a frame.
  9. type frameWriteMsg struct {
  10. // write is the function that does the writing, once the
  11. // writeScheduler (below) has decided to select this frame
  12. // to write. The write functions are all defined in write.go.
  13. write func(ctx writeContext, v interface{}) error
  14. // v is the argument passed to the write function. See each
  15. // function in write.go to see which type they should be,
  16. // depending on what write is.
  17. v interface{}
  18. cost uint32 // if DATA, number of flow control bytes required
  19. stream *stream // used for prioritization
  20. endStream bool // stream is being closed locally
  21. // done, if non-nil, must be a buffered channel with space for
  22. // 1 message and is sent the return value from write (or an
  23. // earlier error) when the frame has been written.
  24. done chan error
  25. }
  26. // writeScheduler tracks pending frames to write, priorities, and decides
  27. // the next one to use. It is not thread-safe.
  28. type writeScheduler struct {
  29. // zero are frames not associated with a specific stream.
  30. // They're sent before any stream-specific freams.
  31. zero writeQueue
  32. sq map[uint32]*writeQueue
  33. }
  34. func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
  35. func (ws *writeScheduler) add(wm frameWriteMsg) {
  36. st := wm.stream
  37. if st == nil {
  38. ws.zero.push(wm)
  39. } else {
  40. ws.streamQueue(st.id).push(wm)
  41. }
  42. }
  43. func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
  44. if q, ok := ws.sq[streamID]; ok {
  45. return q
  46. }
  47. if ws.sq == nil {
  48. ws.sq = make(map[uint32]*writeQueue)
  49. }
  50. q := new(writeQueue)
  51. ws.sq[streamID] = q
  52. return q
  53. }
  54. // take returns the most important frame to write and removes it from the scheduler.
  55. // It is illegal to call this if the scheduler is empty.
  56. func (ws *writeScheduler) take() frameWriteMsg {
  57. // If there any frames not associated with streams, prefer those first.
  58. // These are usually SETTINGS, etc.
  59. if !ws.zero.empty() {
  60. return ws.zero.shift()
  61. }
  62. if len(ws.sq) == 0 {
  63. panic("internal error: take should only be called if non-empty")
  64. }
  65. // Next, prioritize frames on streams that aren't DATA frames (no cost).
  66. for id, q := range ws.sq {
  67. if q.firstIsNoCost() {
  68. return ws.takeFrom(id, q)
  69. }
  70. }
  71. // Now, all that remains are DATA frames. So pick the best one.
  72. // TODO: do that. For now, pick a random one.
  73. for id, q := range ws.sq {
  74. return ws.takeFrom(id, q)
  75. }
  76. panic("internal error: take should only be called if non-empty")
  77. }
  78. func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) frameWriteMsg {
  79. wm := q.shift()
  80. if q.empty() {
  81. // TODO: reclaim its slice and use it for future allocations
  82. // in the writeScheduler.streamQueue method above when making
  83. // the writeQueue.
  84. delete(ws.sq, id)
  85. }
  86. return wm
  87. }
  88. type writeQueue struct {
  89. s []frameWriteMsg
  90. }
  91. func (q *writeQueue) empty() bool { return len(q.s) == 0 }
  92. func (q *writeQueue) push(wm frameWriteMsg) {
  93. q.s = append(q.s, wm)
  94. }
  95. func (q *writeQueue) shift() frameWriteMsg {
  96. if len(q.s) == 0 {
  97. panic("invalid use of queue")
  98. }
  99. wm := q.s[0]
  100. // TODO: less copy-happy queue.
  101. copy(q.s, q.s[1:])
  102. q.s[len(q.s)-1] = frameWriteMsg{}
  103. q.s = q.s[:len(q.s)-1]
  104. return wm
  105. }
  106. func (q *writeQueue) firstIsNoCost() bool { return q.s[0].cost == 0 }