writesched_random.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import "math"
  6. // NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
  7. // priorities. Control frames like SETTINGS and PING are written before DATA
  8. // frames, but if no control frames are queued and multiple streams have queued
  9. // HEADERS or DATA frames, Pop selects a ready stream arbitrarily.
  10. func NewRandomWriteScheduler() WriteScheduler {
  11. return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)}
  12. }
  13. type randomWriteScheduler struct {
  14. // zero are frames not associated with a specific stream.
  15. zero writeQueue
  16. // sq contains the stream-specific queues, keyed by stream ID.
  17. // When a stream is idle, closed, or emptied, it's deleted
  18. // from the map.
  19. sq map[uint32]*writeQueue
  20. // pool of empty queues for reuse.
  21. queuePool writeQueuePool
  22. }
  23. func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
  24. // no-op: idle streams are not tracked
  25. }
  26. func (ws *randomWriteScheduler) CloseStream(streamID uint32) {
  27. q, ok := ws.sq[streamID]
  28. if !ok {
  29. return
  30. }
  31. delete(ws.sq, streamID)
  32. ws.queuePool.put(q)
  33. }
  34. func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
  35. // no-op: priorities are ignored
  36. }
  37. func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) {
  38. id := wr.StreamID()
  39. if id == 0 {
  40. ws.zero.push(wr)
  41. return
  42. }
  43. q, ok := ws.sq[id]
  44. if !ok {
  45. q = ws.queuePool.get()
  46. ws.sq[id] = q
  47. }
  48. q.push(wr)
  49. }
  50. func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) {
  51. // Control frames first.
  52. if !ws.zero.empty() {
  53. return ws.zero.shift(), true
  54. }
  55. // Iterate over all non-idle streams until finding one that can be consumed.
  56. for streamID, q := range ws.sq {
  57. if wr, ok := q.consume(math.MaxInt32); ok {
  58. if q.empty() {
  59. delete(ws.sq, streamID)
  60. ws.queuePool.put(q)
  61. }
  62. return wr, true
  63. }
  64. }
  65. return FrameWriteRequest{}, false
  66. }