|
|
@@ -164,8 +164,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
readFrameCh: make(chan frameAndGate),
|
|
|
readFrameErrCh: make(chan error, 1), // must be buffered for 1
|
|
|
wantWriteFrameCh: make(chan frameWriteMsg, 8),
|
|
|
- writeFrameCh: make(chan frameWriteMsg, 1), // may be 0 or 1, but more is useless. (max 1 in flight)
|
|
|
- wroteFrameCh: make(chan struct{}, 1), // TODO: consider 0. will deadlock currently in sendFrameWrite in sentReset case
|
|
|
+ wroteFrameCh: make(chan struct{}, 1), // TODO: consider 0. will deadlock currently in sendFrameWrite in sentReset case
|
|
|
flow: newFlow(initialWindowSize),
|
|
|
doneServing: make(chan struct{}),
|
|
|
advMaxStreams: srv.maxConcurrentStreams(),
|
|
|
@@ -211,8 +210,7 @@ type serverConn struct {
|
|
|
readFrameCh chan frameAndGate // written by serverConn.readFrames
|
|
|
readFrameErrCh chan error
|
|
|
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
|
|
|
- writeFrameCh chan frameWriteMsg // from serve -> writeFrames
|
|
|
- wroteFrameCh chan struct{} // from writeFrames -> serve, tickles more sends on writeFrameCh
|
|
|
+ wroteFrameCh chan struct{} // from writeFrames -> serve, tickles more frame writes
|
|
|
testHookCh chan func() // code to run on the serve loop
|
|
|
|
|
|
serveG goroutineLock // used to verify funcs are on serve()
|
|
|
@@ -234,8 +232,8 @@ type serverConn struct {
|
|
|
maxHeaderListSize uint32 // zero means unknown (default)
|
|
|
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
|
|
|
req requestParam // non-zero while reading request headers
|
|
|
- writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
|
|
|
- needsFrameFlush bool // last frame to writeFrameCh wasn't a flush
|
|
|
+ writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
|
|
|
+ needsFrameFlush bool // last frame write wasn't a flush
|
|
|
writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
|
|
|
inGoAway bool // we've started to or sent GOAWAY
|
|
|
needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
@@ -414,26 +412,25 @@ func (sc *serverConn) readFrames() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// writeFrames is the loop that writes frames to the peer
|
|
|
-// and is responsible for prioritization and buffering.
|
|
|
-// It's run on its own goroutine.
|
|
|
-func (sc *serverConn) writeFrames() {
|
|
|
+// writeFrameAsync runs in its own goroutine and writes a single frame
|
|
|
+// and then reports when it's done.
|
|
|
+// At most one goroutine can be running writeFrameAsync at a time per
|
|
|
+// serverConn.
|
|
|
+func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
|
|
|
sc.writeG = newGoroutineLock()
|
|
|
- for wm := range sc.writeFrameCh {
|
|
|
- var streamID uint32
|
|
|
- if wm.stream != nil {
|
|
|
- streamID = wm.stream.id
|
|
|
- }
|
|
|
- err := wm.write(sc, streamID, wm.v)
|
|
|
- if ch := wm.done; ch != nil {
|
|
|
- select {
|
|
|
- case ch <- err:
|
|
|
- default:
|
|
|
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
|
|
|
- }
|
|
|
+ var streamID uint32
|
|
|
+ if wm.stream != nil {
|
|
|
+ streamID = wm.stream.id
|
|
|
+ }
|
|
|
+ err := wm.write(sc, streamID, wm.v)
|
|
|
+ if ch := wm.done; ch != nil {
|
|
|
+ select {
|
|
|
+ case ch <- err:
|
|
|
+ default:
|
|
|
+ panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
|
|
|
}
|
|
|
- sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
|
|
|
}
|
|
|
+ sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) flushFrameWriter(uint32, interface{}) error {
|
|
|
@@ -460,8 +457,7 @@ func (sc *serverConn) serve() {
|
|
|
defer sc.conn.Close()
|
|
|
defer sc.closeAllStreamsOnConnClose()
|
|
|
defer sc.stopShutdownTimer()
|
|
|
- defer close(sc.doneServing) // unblocks handlers trying to send
|
|
|
- defer close(sc.writeFrameCh) // stop the writeFrames loop
|
|
|
+ defer close(sc.doneServing) // unblocks handlers trying to send
|
|
|
|
|
|
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
|
|
|
|
|
|
@@ -472,8 +468,7 @@ func (sc *serverConn) serve() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- go sc.readFrames() // closed by defer sc.conn.Close above
|
|
|
- go sc.writeFrames() // source closed in stopServing
|
|
|
+ go sc.readFrames() // closed by defer sc.conn.Close above
|
|
|
|
|
|
settingsTimer := time.NewTimer(firstSettingsTimeout)
|
|
|
for {
|
|
|
@@ -643,7 +638,7 @@ func (sc *serverConn) sendFrameWrite(wm frameWriteMsg) {
|
|
|
sc.closeStream(st, nil)
|
|
|
}
|
|
|
}
|
|
|
- sc.writeFrameCh <- wm
|
|
|
+ go sc.writeFrameAsync(wm)
|
|
|
}
|
|
|
|
|
|
// scheduleFrameWrite tickles the frame writing scheduler.
|