|
|
@@ -201,8 +201,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
bw: newBufferedWriter(c),
|
|
|
handler: h,
|
|
|
streams: make(map[uint32]*stream),
|
|
|
- readFrameCh: make(chan frameAndGate),
|
|
|
- readFrameErrCh: make(chan error, 1), // must be buffered for 1
|
|
|
+ readFrameCh: make(chan readFrameResult),
|
|
|
wantWriteFrameCh: make(chan frameWriteMsg, 8),
|
|
|
wroteFrameCh: make(chan struct{}, 1), // buffered; one send in reading goroutine
|
|
|
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
|
|
|
@@ -309,16 +308,6 @@ func (sc *serverConn) rejectConn(err ErrCode, debug string) {
|
|
|
sc.conn.Close()
|
|
|
}
|
|
|
|
|
|
-// frameAndGates coordinates the readFrames and serve
|
|
|
-// goroutines. Because the Framer interface only permits the most
|
|
|
-// recently-read Frame from being accessed, the readFrames goroutine
|
|
|
-// blocks until it has a frame, passes it to serve, and then waits for
|
|
|
-// serve to be done with it before reading the next one.
|
|
|
-type frameAndGate struct {
|
|
|
- f Frame
|
|
|
- g gate
|
|
|
-}
|
|
|
-
|
|
|
type serverConn struct {
|
|
|
// Immutable:
|
|
|
srv *Server
|
|
|
@@ -328,9 +317,8 @@ type serverConn struct {
|
|
|
handler http.Handler
|
|
|
framer *Framer
|
|
|
hpackDecoder *hpack.Decoder
|
|
|
- doneServing chan struct{} // closed when serverConn.serve ends
|
|
|
- readFrameCh chan frameAndGate // written by serverConn.readFrames
|
|
|
- readFrameErrCh chan error
|
|
|
+ doneServing chan struct{} // closed when serverConn.serve ends
|
|
|
+ readFrameCh chan readFrameResult // written by serverConn.readFrames
|
|
|
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
|
|
|
wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
|
|
|
bodyReadCh chan bodyReadMsg // from handlers -> serve
|
|
|
@@ -541,24 +529,34 @@ func (sc *serverConn) canonicalHeader(v string) string {
|
|
|
return cv
|
|
|
}
|
|
|
|
|
|
+type readFrameResult struct {
|
|
|
+ f Frame // valid until readMore is called
|
|
|
+ err error
|
|
|
+
|
|
|
+ // readMore should be called once the consumer no longer needs or
|
|
|
+ // retains f. After readMore, f is invalid and more frames can be
|
|
|
+ // read.
|
|
|
+ readMore func()
|
|
|
+}
|
|
|
+
|
|
|
// readFrames is the loop that reads incoming frames.
|
|
|
+// It takes care to only read one frame at a time, blocking until the
|
|
|
+// consumer is done with the frame.
|
|
|
// It's run on its own goroutine.
|
|
|
func (sc *serverConn) readFrames() {
|
|
|
- g := make(gate, 1)
|
|
|
+ gate := make(gate)
|
|
|
for {
|
|
|
f, err := sc.framer.ReadFrame()
|
|
|
- if err != nil {
|
|
|
- sc.readFrameErrCh <- err
|
|
|
- close(sc.readFrameCh)
|
|
|
+ select {
|
|
|
+ case sc.readFrameCh <- readFrameResult{f, err, gate.Done}:
|
|
|
+ case <-sc.doneServing:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case <-gate:
|
|
|
+ case <-sc.doneServing:
|
|
|
return
|
|
|
}
|
|
|
- sc.readFrameCh <- frameAndGate{f, g}
|
|
|
- // We can't read another frame until this one is
|
|
|
- // processed, as the ReadFrame interface doesn't copy
|
|
|
- // memory. The Frame accessor methods access the last
|
|
|
- // frame's (shared) buffer. So we wait for the
|
|
|
- // serve goroutine to tell us it's done:
|
|
|
- g.Wait()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -648,13 +646,11 @@ func (sc *serverConn) serve() {
|
|
|
}
|
|
|
sc.writingFrame = false
|
|
|
sc.scheduleFrameWrite()
|
|
|
- case fg, ok := <-sc.readFrameCh:
|
|
|
- if !ok {
|
|
|
- sc.readFrameCh = nil
|
|
|
- }
|
|
|
- if !sc.processFrameFromReader(fg, ok) {
|
|
|
+ case res := <-sc.readFrameCh:
|
|
|
+ if !sc.processFrameFromReader(res) {
|
|
|
return
|
|
|
}
|
|
|
+ res.readMore()
|
|
|
if settingsTimer.C != nil {
|
|
|
settingsTimer.Stop()
|
|
|
settingsTimer.C = nil
|
|
|
@@ -901,17 +897,15 @@ func (sc *serverConn) curHeaderStreamID() uint32 {
|
|
|
// processFrameFromReader processes the serve loop's read from readFrameCh from the
|
|
|
// frame-reading goroutine.
|
|
|
// processFrameFromReader returns whether the connection should be kept open.
|
|
|
-func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
|
|
|
+func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
|
|
|
sc.serveG.check()
|
|
|
- var clientGone bool
|
|
|
- var err error
|
|
|
- if !fgValid {
|
|
|
- err = <-sc.readFrameErrCh
|
|
|
+ err := res.err
|
|
|
+ if err != nil {
|
|
|
if err == ErrFrameTooLarge {
|
|
|
sc.goAway(ErrCodeFrameSize)
|
|
|
return true // goAway will close the loop
|
|
|
}
|
|
|
- clientGone = err == io.EOF || strings.Contains(err.Error(), "use of closed network connection")
|
|
|
+ clientGone := err == io.EOF || strings.Contains(err.Error(), "use of closed network connection")
|
|
|
if clientGone {
|
|
|
// TODO: could we also get into this state if
|
|
|
// the peer does a half close
|
|
|
@@ -923,13 +917,10 @@ func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool
|
|
|
// just for testing we could have a non-TLS mode.
|
|
|
return false
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- if fgValid {
|
|
|
- f := fg.f
|
|
|
+ } else {
|
|
|
+ f := res.f
|
|
|
sc.vlogf("got %v: %#v", f.Header(), f)
|
|
|
err = sc.processFrame(f)
|
|
|
- fg.g.Done() // unblock the readFrames goroutine
|
|
|
if err == nil {
|
|
|
return true
|
|
|
}
|
|
|
@@ -947,13 +938,13 @@ func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool
|
|
|
sc.goAway(ErrCode(ev))
|
|
|
return true // goAway will handle shutdown
|
|
|
default:
|
|
|
- if !fgValid {
|
|
|
+ if res.err != nil {
|
|
|
sc.logf("disconnecting; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
|
|
|
} else {
|
|
|
sc.logf("disconnection due to other error: %v", err)
|
|
|
}
|
|
|
+ return false
|
|
|
}
|
|
|
- return false
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) processFrame(f Frame) error {
|