|
|
@@ -32,44 +32,95 @@ type frameWriteMsg struct {
|
|
|
// writeScheduler tracks pending frames to write, priorities, and decides
|
|
|
// the next one to use. It is not thread-safe.
|
|
|
type writeScheduler struct {
|
|
|
- slice []frameWriteMsg
|
|
|
+ // zero are frames not associated with a specific stream.
|
|
|
+ // They're sent before any stream-specific freams.
|
|
|
+ zero writeQueue
|
|
|
+
|
|
|
+ sq map[uint32]*writeQueue
|
|
|
}
|
|
|
|
|
|
-func (ws *writeScheduler) empty() bool { return len(ws.slice) == 0 }
|
|
|
+func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
|
|
|
|
|
|
func (ws *writeScheduler) add(wm frameWriteMsg) {
|
|
|
- ws.slice = append(ws.slice, wm)
|
|
|
+ st := wm.stream
|
|
|
+ if st == nil {
|
|
|
+ ws.zero.push(wm)
|
|
|
+ } else {
|
|
|
+ ws.streamQueue(st.id).push(wm)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-// take returns
|
|
|
+func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
|
|
|
+ if q, ok := ws.sq[streamID]; ok {
|
|
|
+ return q
|
|
|
+ }
|
|
|
+ if ws.sq == nil {
|
|
|
+ ws.sq = make(map[uint32]*writeQueue)
|
|
|
+ }
|
|
|
+ q := new(writeQueue)
|
|
|
+ ws.sq[streamID] = q
|
|
|
+ return q
|
|
|
+}
|
|
|
+
|
|
|
+// take returns the most important frame to write and removes it from the scheduler.
|
|
|
+// It is illegal to call this if the scheduler is empty.
|
|
|
func (ws *writeScheduler) take() frameWriteMsg {
|
|
|
- if ws.empty() {
|
|
|
- panic("internal error: writeScheduler.take called when empty")
|
|
|
+ // If there any frames not associated with streams, prefer those first.
|
|
|
+ // These are usually SETTINGS, etc.
|
|
|
+ if !ws.zero.empty() {
|
|
|
+ return ws.zero.shift()
|
|
|
}
|
|
|
- // TODO:
|
|
|
- // -- prioritize all non-DATA frames first. they're not flow controlled anyway and
|
|
|
- // they're generally more important.
|
|
|
- // -- for all DATA frames that are enqueued (and we should enqueue []byte instead of FRAMES),
|
|
|
- // go over each (in priority order, as determined by the whole priority tree chaos),
|
|
|
- // and decide which we have tokens for, and how many tokens.
|
|
|
-
|
|
|
- // Writing on stream X requires that we have tokens on the
|
|
|
- // stream 0 (the conn-as-a-whole stream) as well as stream X.
|
|
|
-
|
|
|
- // So: find the highest priority stream X, then see: do we
|
|
|
- // have tokens for X? Let's say we have N_X tokens. Then we should
|
|
|
- // write MIN(N_X, TOKENS(conn-wide-tokens)).
|
|
|
- //
|
|
|
- // Any tokens left over? Repeat. Well, not really... the
|
|
|
- // repeat will happen via the next call to
|
|
|
- // scheduleFrameWrite. So keep a HEAP (priqueue) of which
|
|
|
- // streams to write to.
|
|
|
-
|
|
|
- // TODO: proper scheduler
|
|
|
- wm := ws.slice[0]
|
|
|
- // shift it all down. kinda lame. will be removed later anyway.
|
|
|
- copy(ws.slice, ws.slice[1:])
|
|
|
- ws.slice[len(ws.slice)-1] = frameWriteMsg{}
|
|
|
- ws.slice = ws.slice[:len(ws.slice)-1]
|
|
|
+
|
|
|
+ if len(ws.sq) == 0 {
|
|
|
+ panic("internal error: take should only be called if non-empty")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Next, prioritize frames on streams that aren't DATA frames (no cost).
|
|
|
+ for id, q := range ws.sq {
|
|
|
+ if q.firstIsNoCost() {
|
|
|
+ return ws.takeFrom(id, q)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now, all that remains are DATA frames. So pick the best one.
|
|
|
+ // TODO: do that. For now, pick a random one.
|
|
|
+ for id, q := range ws.sq {
|
|
|
+ return ws.takeFrom(id, q)
|
|
|
+ }
|
|
|
+ panic("internal error: take should only be called if non-empty")
|
|
|
+}
|
|
|
+
|
|
|
+func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) frameWriteMsg {
|
|
|
+ wm := q.shift()
|
|
|
+ if q.empty() {
|
|
|
+ // TODO: reclaim its slice and use it for future allocations
|
|
|
+ // in the writeScheduler.streamQueue method above when making
|
|
|
+ // the writeQueue.
|
|
|
+ delete(ws.sq, id)
|
|
|
+ }
|
|
|
+ return wm
|
|
|
+}
|
|
|
+
|
|
|
+type writeQueue struct {
|
|
|
+ s []frameWriteMsg
|
|
|
+}
|
|
|
+
|
|
|
+func (q *writeQueue) empty() bool { return len(q.s) == 0 }
|
|
|
+
|
|
|
+func (q *writeQueue) push(wm frameWriteMsg) {
|
|
|
+ q.s = append(q.s, wm)
|
|
|
+}
|
|
|
+
|
|
|
+func (q *writeQueue) shift() frameWriteMsg {
|
|
|
+ if len(q.s) == 0 {
|
|
|
+ panic("invalid use of queue")
|
|
|
+ }
|
|
|
+ wm := q.s[0]
|
|
|
+ // TODO: less copy-happy queue.
|
|
|
+ copy(q.s, q.s[1:])
|
|
|
+ q.s[len(q.s)-1] = frameWriteMsg{}
|
|
|
+ q.s = q.s[:len(q.s)-1]
|
|
|
return wm
|
|
|
}
|
|
|
+
|
|
|
+func (q *writeQueue) firstIsNoCost() bool { return q.s[0].cost == 0 }
|