|
|
@@ -27,8 +27,28 @@ import (
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- prefaceTimeout = 5 * time.Second
|
|
|
- firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
|
+ prefaceTimeout = 5 * time.Second
|
|
|
+ firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
|
+ handlerChunkWriteSize = 4 << 10
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ errClientDisconnected = errors.New("client disconnected")
|
|
|
+ errClosedBody = errors.New("body closed by handler")
|
|
|
+)
|
|
|
+
|
|
|
+var responseWriterStatePool = sync.Pool{
|
|
|
+ New: func() interface{} {
|
|
|
+ rws := &responseWriterState{}
|
|
|
+ rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
|
|
|
+ return rws
|
|
|
+ },
|
|
|
+}
|
|
|
+
|
|
|
+// Test hooks.
|
|
|
+var (
|
|
|
+ testHookOnConn func()
|
|
|
+ testHookGetServerConn func(*serverConn)
|
|
|
)
|
|
|
|
|
|
// TODO: finish GOAWAY support. Consider each incoming frame type and
|
|
|
@@ -78,8 +98,6 @@ func (s *Server) maxReadFrameSize() uint32 {
|
|
|
return defaultMaxReadFrameSize
|
|
|
}
|
|
|
|
|
|
-var testHookOnConn func() // for testing
|
|
|
-
|
|
|
// ConfigureServer adds HTTP/2 support to a net/http Server.
|
|
|
//
|
|
|
// The configuration conf may be nil.
|
|
|
@@ -114,8 +132,6 @@ func ConfigureServer(s *http.Server, conf *Server) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-var testHookGetServerConn func(*serverConn)
|
|
|
-
|
|
|
func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
sc := &serverConn{
|
|
|
srv: srv,
|
|
|
@@ -201,8 +217,8 @@ type serverConn struct {
|
|
|
inGoAway bool // we've started to or sent GOAWAY
|
|
|
needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
goAwayCode ErrCode
|
|
|
- shutdownTimerCh <-chan time.Time
|
|
|
- shutdownTimer *time.Timer
|
|
|
+ shutdownTimerCh <-chan time.Time // nil until used
|
|
|
+ shutdownTimer *time.Timer // nil until used
|
|
|
|
|
|
// Owned by the writeFrames goroutine; use writeG.check():
|
|
|
headerWriteBuf bytes.Buffer
|
|
|
@@ -402,8 +418,6 @@ func (sc *serverConn) flushFrameWriter(uint32, interface{}) error {
|
|
|
return sc.bw.Flush() // may block on the network
|
|
|
}
|
|
|
|
|
|
-var errClientDisconnected = errors.New("client disconnected")
|
|
|
-
|
|
|
func (sc *serverConn) closeAllStreamsOnConnClose() {
|
|
|
sc.serveG.check()
|
|
|
for _, st := range sc.streams {
|
|
|
@@ -428,7 +442,7 @@ func (sc *serverConn) serve() {
|
|
|
|
|
|
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
|
|
|
|
|
|
- sc.enqueueFrameWrite(frameWriteMsg{write: (*serverConn).sendInitialSettings})
|
|
|
+ sc.writeFrame(frameWriteMsg{write: (*serverConn).sendInitialSettings})
|
|
|
|
|
|
if err := sc.readPreface(); err != nil {
|
|
|
sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
|
@@ -442,7 +456,7 @@ func (sc *serverConn) serve() {
|
|
|
for {
|
|
|
select {
|
|
|
case wm := <-sc.wantWriteFrameCh:
|
|
|
- sc.enqueueFrameWrite(wm)
|
|
|
+ sc.writeFrame(wm)
|
|
|
case <-sc.wroteFrameCh:
|
|
|
sc.writingFrame = false
|
|
|
sc.scheduleFrameWrite()
|
|
|
@@ -520,7 +534,7 @@ func (sc *serverConn) writeData(stream *stream, data *dataWriteParams, ch chan e
|
|
|
// loop and modify the frame scheduler there to write chunks
|
|
|
// of req as tokens allow. Don't necessarily write it all at
|
|
|
// once in one frame.
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
+ sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
write: (*serverConn).writeDataFrame,
|
|
|
cost: uint32(len(data.p)),
|
|
|
stream: stream,
|
|
|
@@ -536,14 +550,14 @@ func (sc *serverConn) writeData(stream *stream, data *dataWriteParams, ch chan e
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// writeFrame sends wm to sc.wantWriteFrameCh, but aborts if the
|
|
|
-// connection has gone away.
|
|
|
+// writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts
|
|
|
+// if the connection has gone away.
|
|
|
//
|
|
|
// This must not be run from the serve goroutine itself, else it might
|
|
|
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
|
|
|
// buffered and is read by serve itself). If you're on the serve
|
|
|
-// goroutine, call enqueueFrameWrite instead.
|
|
|
-func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
+// goroutine, call writeFrame instead.
|
|
|
+func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
|
|
|
sc.serveG.checkNotOn() // NOT
|
|
|
select {
|
|
|
case sc.wantWriteFrameCh <- wm:
|
|
|
@@ -552,13 +566,13 @@ func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// enqueueFrameWrite either sends wm to the writeFrames goroutine, or
|
|
|
+// writeFrame either sends wm to the writeFrames goroutine, or
|
|
|
// enqueues it for the future (with no pushback; the serve goroutine
|
|
|
// never blocks!), for sending when the currently-being-written frame
|
|
|
// is done writing.
|
|
|
//
|
|
|
// If you're not on the serve goroutine, use writeFrame instead.
|
|
|
-func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
|
|
|
+func (sc *serverConn) writeFrame(wm frameWriteMsg) {
|
|
|
sc.serveG.check()
|
|
|
// Fast path for common case:
|
|
|
if !sc.writingFrame {
|
|
|
@@ -609,26 +623,18 @@ func (sc *serverConn) sendFrameWrite(wm frameWriteMsg) {
|
|
|
sc.writeFrameCh <- wm
|
|
|
}
|
|
|
|
|
|
-func (sc *serverConn) sendFrameWriteFlush() {
|
|
|
- sc.serveG.check()
|
|
|
- if sc.writingFrame {
|
|
|
- panic("invariant")
|
|
|
- }
|
|
|
- sc.writingFrame = true
|
|
|
- sc.needsFrameFlush = false
|
|
|
- sc.writeFrameCh <- frameWriteMsg{write: (*serverConn).flushFrameWriter}
|
|
|
-}
|
|
|
-
|
|
|
-func (sc *serverConn) enqueueSettingsAck() {
|
|
|
- sc.serveG.check()
|
|
|
- if !sc.writingFrame {
|
|
|
- sc.needToSendSettingsAck = false
|
|
|
- sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
|
|
|
- return
|
|
|
- }
|
|
|
- sc.needToSendSettingsAck = true
|
|
|
-}
|
|
|
-
|
|
|
+// scheduleFrameWrite tickles the frame writing scheduler.
|
|
|
+//
|
|
|
+// If a frame is already being written, nothing happens. This will be called again
|
|
|
+// when the frame is done being written.
|
|
|
+//
|
|
|
+// If a frame isn't being written we need to send one, the best frame
|
|
|
+// to send is selected, preferring first things that aren't
|
|
|
+// stream-specific (e.g. ACKing settings), and then finding the
|
|
|
+// highest priority stream.
|
|
|
+//
|
|
|
+// If a frame isn't being written and there's nothing else to send, we
|
|
|
+// flush the write buffer.
|
|
|
func (sc *serverConn) scheduleFrameWrite() {
|
|
|
sc.serveG.check()
|
|
|
if sc.writingFrame {
|
|
|
@@ -646,7 +652,8 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
return
|
|
|
}
|
|
|
if len(sc.writeQueue) == 0 && sc.needsFrameFlush {
|
|
|
- sc.sendFrameWriteFlush()
|
|
|
+ sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).flushFrameWriter})
|
|
|
+ sc.needsFrameFlush = false // after sendFrameWrite, since it sets this true
|
|
|
return
|
|
|
}
|
|
|
if sc.inGoAway {
|
|
|
@@ -654,7 +661,8 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
return
|
|
|
}
|
|
|
if sc.needToSendSettingsAck {
|
|
|
- sc.enqueueSettingsAck()
|
|
|
+ sc.needToSendSettingsAck = false
|
|
|
+ sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
|
|
|
return
|
|
|
}
|
|
|
if len(sc.writeQueue) == 0 {
|
|
|
@@ -734,7 +742,7 @@ func (sc *serverConn) resetStreamInLoop(se StreamError) {
|
|
|
if !ok {
|
|
|
panic(fmt.Sprintf("invariant. closing non-open stream %d", se.StreamID))
|
|
|
}
|
|
|
- sc.enqueueFrameWrite(frameWriteMsg{
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
write: (*serverConn).writeRSTStreamFrame,
|
|
|
v: &se,
|
|
|
})
|
|
|
@@ -859,7 +867,7 @@ func (sc *serverConn) processPing(f *PingFrame) error {
|
|
|
// PROTOCOL_ERROR."
|
|
|
return ConnectionError(ErrCodeProtocol)
|
|
|
}
|
|
|
- sc.enqueueFrameWrite(frameWriteMsg{
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
write: (*serverConn).writePingAck,
|
|
|
v: f,
|
|
|
})
|
|
|
@@ -937,7 +945,8 @@ func (sc *serverConn) processSettings(f *SettingsFrame) error {
|
|
|
if err := f.ForeachSetting(sc.processSetting); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- sc.enqueueSettingsAck()
|
|
|
+ sc.needToSendSettingsAck = true
|
|
|
+ sc.scheduleFrameWrite()
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -1217,16 +1226,6 @@ func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, err
|
|
|
return rw, req, nil
|
|
|
}
|
|
|
|
|
|
-const handlerChunkWriteSize = 4 << 10
|
|
|
-
|
|
|
-var responseWriterStatePool = sync.Pool{
|
|
|
- New: func() interface{} {
|
|
|
- rws := &responseWriterState{}
|
|
|
- rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
|
|
|
- return rws
|
|
|
- },
|
|
|
-}
|
|
|
-
|
|
|
// Run on its own goroutine.
|
|
|
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
|
|
|
defer rw.handlerDone()
|
|
|
@@ -1272,7 +1271,7 @@ func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
|
|
|
// mutates it.
|
|
|
errc = tempCh
|
|
|
}
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
+ sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
write: (*serverConn).writeHeadersFrame,
|
|
|
v: req,
|
|
|
stream: req.stream,
|
|
|
@@ -1329,7 +1328,7 @@ func (sc *serverConn) writeHeadersFrame(streamID uint32, v interface{}) error {
|
|
|
// called from handler goroutines.
|
|
|
func (sc *serverConn) write100ContinueHeaders(st *stream) {
|
|
|
sc.serveG.checkNotOn() // NOT
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
+ sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
write: (*serverConn).write100ContinueHeadersFrame,
|
|
|
stream: st,
|
|
|
})
|
|
|
@@ -1365,7 +1364,7 @@ func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
}
|
|
|
const maxUint32 = 2147483647
|
|
|
for n >= maxUint32 {
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
+ sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
|
v: windowUpdateReq{maxUint32},
|
|
|
stream: st,
|
|
|
@@ -1373,7 +1372,7 @@ func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
n -= maxUint32
|
|
|
}
|
|
|
if n > 0 {
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
+ sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
write: (*serverConn).sendWindowUpdateInLoop,
|
|
|
v: windowUpdateReq{uint32(n)},
|
|
|
stream: st,
|
|
|
@@ -1400,8 +1399,6 @@ type requestBody struct {
|
|
|
needsContinue bool // need to send a 100-continue
|
|
|
}
|
|
|
|
|
|
-var errClosedBody = errors.New("body closed by handler")
|
|
|
-
|
|
|
func (b *requestBody) Close() error {
|
|
|
if b.pipe != nil {
|
|
|
b.pipe.Close(errClosedBody)
|