|
|
@@ -67,20 +67,17 @@ var (
|
|
|
// TODO: send PING frames to idle clients and disconnect them if no
|
|
|
// reply
|
|
|
|
|
|
-// TODO: don't keep the writeFrames goroutine active. turn it off when no frames
|
|
|
-// are enqueued.
|
|
|
-
|
|
|
-// TODO: for bonus points: turn off the serve goroutine also when
|
|
|
-// idle, so an idle conn only has the readFrames goroutine
|
|
|
-// active. (which could also be optimized probably to pin less memory
|
|
|
-// in crypto/tls). This would involve tracking when the serve
|
|
|
-// goroutine is active (atomic int32 read/CAS probably?) and starting
|
|
|
-// it up when frames arrive, and shutting it down when all handlers
|
|
|
-// exit. the occasional PING packets could use time.AfterFunc to call
|
|
|
-// sc.wakeStartServeLoop() (which is a no-op if already running) and
|
|
|
-// then queue the PING write as normal. The serve loop would then exit
|
|
|
-// in most cases (if no Handlers running) and not be woken up again
|
|
|
-// until the PING packet returns.
|
|
|
+// TODO: for bonus points: turn off the serve goroutine when idle, so
|
|
|
+// an idle conn only has the readFrames goroutine active. (which could
|
|
|
+// also be optimized probably to pin less memory in crypto/tls). This
|
|
|
+// would involve tracking when the serve goroutine is active (atomic
|
|
|
+// int32 read/CAS probably?) and starting it up when frames arrive,
|
|
|
+// and shutting it down when all handlers exit. the occasional PING
|
|
|
+// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
|
|
|
+// (which is a no-op if already running) and then queue the PING write
|
|
|
+// as normal. The serve loop would then exit in most cases (if no
|
|
|
+// Handlers running) and not be woken up again until the PING packet
|
|
|
+// returns.
|
|
|
|
|
|
// Server is an HTTP/2 server.
|
|
|
type Server struct {
|
|
|
@@ -164,7 +161,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
readFrameCh: make(chan frameAndGate),
|
|
|
readFrameErrCh: make(chan error, 1), // must be buffered for 1
|
|
|
wantWriteFrameCh: make(chan frameWriteMsg, 8),
|
|
|
- wroteFrameCh: make(chan struct{}, 1), // TODO: consider 0. will deadlock currently in sendFrameWrite in sentReset case
|
|
|
+ wroteFrameCh: make(chan struct{}, 1), // TODO: consider 0. will deadlock currently in startFrameWrite in sentReset case
|
|
|
flow: newFlow(initialWindowSize),
|
|
|
doneServing: make(chan struct{}),
|
|
|
advMaxStreams: srv.maxConcurrentStreams(),
|
|
|
@@ -210,14 +207,12 @@ type serverConn struct {
|
|
|
readFrameCh chan frameAndGate // written by serverConn.readFrames
|
|
|
readFrameErrCh chan error
|
|
|
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
|
|
|
- wroteFrameCh chan struct{} // from writeFrames -> serve, tickles more frame writes
|
|
|
+ wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
|
|
|
testHookCh chan func() // code to run on the serve loop
|
|
|
-
|
|
|
- serveG goroutineLock // used to verify funcs are on serve()
|
|
|
- writeG goroutineLock // used to verify things running on writeLoop
|
|
|
- flow *flow // connection-wide (not stream-specific) flow control
|
|
|
+ flow *flow // connection-wide (not stream-specific) flow control
|
|
|
|
|
|
// Everything following is owned by the serve loop; use serveG.check():
|
|
|
+ serveG goroutineLock // used to verify funcs are on serve()
|
|
|
pushEnabled bool
|
|
|
sawFirstSettings bool // got the initial SETTINGS frame after the preface
|
|
|
needToSendSettingsAck bool
|
|
|
@@ -241,7 +236,8 @@ type serverConn struct {
|
|
|
shutdownTimerCh <-chan time.Time // nil until used
|
|
|
shutdownTimer *time.Timer // nil until used
|
|
|
|
|
|
- // Owned by the writeFrames goroutine; use writeG.check():
|
|
|
+ // Owned by the writeFrameAsync goroutine; use writeG.check():
|
|
|
+ writeG goroutineLock // used to verify things running on writeFrameAsync
|
|
|
headerWriteBuf bytes.Buffer
|
|
|
hpackEncoder *hpack.Encoder
|
|
|
}
|
|
|
@@ -594,20 +590,19 @@ func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
sc.serveG.check()
|
|
|
// Fast path for common case:
|
|
|
if !sc.writingFrame {
|
|
|
- sc.sendFrameWrite(wm)
|
|
|
+ sc.startFrameWrite(wm)
|
|
|
return
|
|
|
}
|
|
|
sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
|
|
|
}
|
|
|
|
|
|
-// sendFrameWrite sends a frame to the writeFrames goroutine.
|
|
|
-// Only one frame can be in-flight at a time.
|
|
|
-// sendFrameWrite also updates stream state right before the frame is
|
|
|
-// sent to be written.
|
|
|
-func (sc *serverConn) sendFrameWrite(wm frameWriteMsg) {
|
|
|
+// startFrameWrite starts a goroutine to write wm (in a separate
|
|
|
+// goroutine since that might block on the network), and updates the
|
|
|
+// serve goroutine's state about the world, updated from info in wm.
|
|
|
+func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
|
|
|
sc.serveG.check()
|
|
|
if sc.writingFrame {
|
|
|
- panic("invariant")
|
|
|
+ panic("internal error: can only be writing one frame at a time")
|
|
|
}
|
|
|
|
|
|
st := wm.stream
|
|
|
@@ -660,7 +655,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
}
|
|
|
if sc.needToSendGoAway {
|
|
|
sc.needToSendGoAway = false
|
|
|
- sc.sendFrameWrite(frameWriteMsg{
|
|
|
+ sc.startFrameWrite(frameWriteMsg{
|
|
|
write: (*serverConn).writeGoAwayFrame,
|
|
|
v: &goAwayParams{
|
|
|
maxStreamID: sc.maxStreamID,
|
|
|
@@ -670,8 +665,8 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
return
|
|
|
}
|
|
|
if len(sc.writeQueue) == 0 && sc.needsFrameFlush {
|
|
|
- sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).flushFrameWriter})
|
|
|
- sc.needsFrameFlush = false // after sendFrameWrite, since it sets this true
|
|
|
+ sc.startFrameWrite(frameWriteMsg{write: (*serverConn).flushFrameWriter})
|
|
|
+ sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
|
|
|
return
|
|
|
}
|
|
|
if sc.inGoAway {
|
|
|
@@ -680,7 +675,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
}
|
|
|
if sc.needToSendSettingsAck {
|
|
|
sc.needToSendSettingsAck = false
|
|
|
- sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
|
|
|
+ sc.startFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
|
|
|
return
|
|
|
}
|
|
|
if len(sc.writeQueue) == 0 {
|
|
|
@@ -716,7 +711,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
// (because a SETTINGS frame changed our max frame size while
|
|
|
// a stream was open and writing) and cut it up into smaller
|
|
|
// bits.
|
|
|
- sc.sendFrameWrite(wm)
|
|
|
+ sc.startFrameWrite(wm)
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) goAway(code ErrCode) {
|
|
|
@@ -1277,7 +1272,7 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
|
|
|
}
|
|
|
|
|
|
type frameWriteMsg struct {
|
|
|
- // write runs on the writeFrames goroutine.
|
|
|
+ // write runs on the writeFrameAsync goroutine.
|
|
|
write func(sc *serverConn, streamID uint32, v interface{}) error
|
|
|
|
|
|
v interface{} // passed to write
|