|
|
@@ -78,7 +78,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
handler: h,
|
|
|
framer: NewFramer(c, c), // TODO: write to a (custom?) buffered writer that can alternate when it's in buffered mode.
|
|
|
streams: make(map[uint32]*stream),
|
|
|
- readFrameCh: make(chan frameAndProcessed),
|
|
|
+ readFrameCh: make(chan frameAndGate),
|
|
|
readFrameErrCh: make(chan error, 1), // must be buffered for 1
|
|
|
wantWriteFrameCh: make(chan frameWriteMsg, 8),
|
|
|
writeFrameCh: make(chan frameWriteMsg, 1), // may be 0 or 1, but more is useless. (max 1 in flight)
|
|
|
@@ -94,13 +94,14 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
sc.serve()
|
|
|
}
|
|
|
|
|
|
-// frameAndProcessed coordinates the readFrames and serve goroutines, since
|
|
|
-// the Framer interface only permits the most recently-read Frame from being
|
|
|
-// accessed. The serve goroutine sends on processed to signal to the readFrames
|
|
|
-// goroutine that another frame may be read.
|
|
|
-type frameAndProcessed struct {
|
|
|
- f Frame
|
|
|
- processed chan struct{}
|
|
|
+// frameAndGates coordinates the readFrames and serve
|
|
|
+// goroutines. Because the Framer interface only permits the most
|
|
|
+// recently-read Frame from being accessed, the readFrames goroutine
|
|
|
+// blocks until it has a frame, passes it to serve, and then waits for
|
|
|
+// serve to be done with it before reading the next one.
|
|
|
+type frameAndGate struct {
|
|
|
+ f Frame
|
|
|
+ g gate
|
|
|
}
|
|
|
|
|
|
type serverConn struct {
|
|
|
@@ -110,8 +111,8 @@ type serverConn struct {
|
|
|
handler http.Handler
|
|
|
framer *Framer
|
|
|
hpackDecoder *hpack.Decoder
|
|
|
- doneServing chan struct{} // closed when serverConn.serve ends
|
|
|
- readFrameCh chan frameAndProcessed // written by serverConn.readFrames
|
|
|
+ doneServing chan struct{} // closed when serverConn.serve ends
|
|
|
+ readFrameCh chan frameAndGate // written by serverConn.readFrames
|
|
|
readFrameErrCh chan error
|
|
|
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
|
|
|
writeFrameCh chan frameWriteMsg // from serve -> writeFrames
|
|
|
@@ -277,7 +278,7 @@ func (sc *serverConn) canonicalHeader(v string) string {
|
|
|
// readFrames is the loop that reads incoming frames.
|
|
|
// It's run on its own goroutine.
|
|
|
func (sc *serverConn) readFrames() {
|
|
|
- processed := make(chan struct{}, 1)
|
|
|
+ g := make(gate, 1)
|
|
|
for {
|
|
|
f, err := sc.framer.ReadFrame()
|
|
|
if err != nil {
|
|
|
@@ -285,8 +286,8 @@ func (sc *serverConn) readFrames() {
|
|
|
close(sc.readFrameCh)
|
|
|
return
|
|
|
}
|
|
|
- sc.readFrameCh <- frameAndProcessed{f, processed}
|
|
|
- <-processed
|
|
|
+ sc.readFrameCh <- frameAndGate{f, g}
|
|
|
+ g.Wait()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -362,7 +363,7 @@ func (sc *serverConn) serve() {
|
|
|
case <-sc.wroteFrameCh:
|
|
|
sc.writingFrame = false
|
|
|
sc.scheduleFrameWrite()
|
|
|
- case fp, ok := <-sc.readFrameCh:
|
|
|
+ case fg, ok := <-sc.readFrameCh:
|
|
|
if !ok {
|
|
|
err := <-sc.readFrameErrCh
|
|
|
if err != io.EOF {
|
|
|
@@ -373,10 +374,10 @@ func (sc *serverConn) serve() {
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
- f := fp.f
|
|
|
+ f := fg.f
|
|
|
sc.vlogf("got %v: %#v", f.Header(), f)
|
|
|
err := sc.processFrame(f)
|
|
|
- fp.processed <- struct{}{} // let readFrames proceed
|
|
|
+ fg.g.Done() // unblock the readFrames goroutine
|
|
|
switch ev := err.(type) {
|
|
|
case nil:
|
|
|
// nothing.
|