|
|
@@ -229,9 +229,9 @@ type serverConn struct {
|
|
|
req requestParam // non-zero while reading request headers
|
|
|
writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
|
|
|
needsFrameFlush bool // last frame write wasn't a flush
|
|
|
- writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
|
|
|
- inGoAway bool // we've started to or sent GOAWAY
|
|
|
- needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
+ writeSched writeScheduler
|
|
|
+ inGoAway bool // we've started to or sent GOAWAY
|
|
|
+ needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
goAwayCode ErrCode
|
|
|
shutdownTimerCh <-chan time.Time // nil until used
|
|
|
shutdownTimer *time.Timer // nil until used
|
|
|
@@ -598,7 +598,7 @@ func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
sc.startFrameWrite(wm)
|
|
|
return
|
|
|
}
|
|
|
- sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
|
|
|
+ sc.writeSched.add(wm)
|
|
|
}
|
|
|
|
|
|
// startFrameWrite starts a goroutine to write wm (in a separate
|
|
|
@@ -669,7 +669,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
})
|
|
|
return
|
|
|
}
|
|
|
- if len(sc.writeQueue) == 0 && sc.needsFrameFlush {
|
|
|
+ if sc.writeSched.empty() && sc.needsFrameFlush {
|
|
|
sc.startFrameWrite(frameWriteMsg{write: (*serverConn).flushFrameWriter})
|
|
|
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
|
|
|
return
|
|
|
@@ -683,40 +683,14 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
sc.startFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
|
|
|
return
|
|
|
}
|
|
|
- if len(sc.writeQueue) == 0 {
|
|
|
+ if sc.writeSched.empty() {
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
- // TODO:
|
|
|
- // -- prioritize all non-DATA frames first. they're not flow controlled anyway and
|
|
|
- // they're generally more important.
|
|
|
- // -- for all DATA frames that are enqueued (and we should enqueue []byte instead of FRAMES),
|
|
|
- // go over each (in priority order, as determined by the whole priority tree chaos),
|
|
|
- // and decide which we have tokens for, and how many tokens.
|
|
|
-
|
|
|
- // Writing on stream X requires that we have tokens on the
|
|
|
- // stream 0 (the conn-as-a-whole stream) as well as stream X.
|
|
|
-
|
|
|
- // So: find the highest priority stream X, then see: do we
|
|
|
- // have tokens for X? Let's say we have N_X tokens. Then we should
|
|
|
- // write MIN(N_X, TOKENS(conn-wide-tokens)).
|
|
|
- //
|
|
|
- // Any tokens left over? Repeat. Well, not really... the
|
|
|
- // repeat will happen via the next call to
|
|
|
- // scheduleFrameWrite. So keep a HEAP (priqueue) of which
|
|
|
- // streams to write to.
|
|
|
-
|
|
|
- // TODO: proper scheduler
|
|
|
- wm := sc.writeQueue[0]
|
|
|
- // shift it all down. kinda lame. will be removed later anyway.
|
|
|
- copy(sc.writeQueue, sc.writeQueue[1:])
|
|
|
- sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1]
|
|
|
-
|
|
|
// TODO: if wm is a data frame, make sure it's not too big
|
|
|
// (because a SETTINGS frame changed our max frame size while
|
|
|
// a stream was open and writing) and cut it up into smaller
|
|
|
// bits.
|
|
|
- sc.startFrameWrite(wm)
|
|
|
+ sc.startFrameWrite(sc.writeSched.take())
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) goAway(code ErrCode) {
|