|
@@ -26,8 +26,21 @@ import (
|
|
|
"github.com/bradfitz/http2/hpack"
|
|
"github.com/bradfitz/http2/hpack"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// TODO: finish GOAWAY support. Consider each incoming frame type and whether
|
|
|
|
|
-// it should be ignored during a shutdown race.
|
|
|
|
|
|
|
+const (
|
|
|
|
|
+ prefaceTimeout = 5 * time.Second
|
|
|
|
|
+ firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+// TODO: finish GOAWAY support. Consider each incoming frame type and
|
|
|
|
|
+// whether it should be ignored during a shutdown race.
|
|
|
|
|
+
|
|
|
|
|
+// TODO: (edge case?) if peer sends a SETTINGS frame with e.g. a
|
|
|
|
|
+// SETTINGS_MAX_FRAME_SIZE that's lower than what we had before,
|
|
|
|
|
+// before we ACK it we have to make sure all currently-active streams
|
|
|
|
|
+// know about that and don't have existing too-large frames in flight?
|
|
|
|
|
+// Perhaps the settings processing should just wait for new frame to
|
|
|
|
|
+// be in-flight and then the frame scheduler in the serve goroutine
|
|
|
|
|
+// will be responsible for splitting things.
|
|
|
|
|
|
|
|
// Server is an HTTP/2 server.
|
|
// Server is an HTTP/2 server.
|
|
|
type Server struct {
|
|
type Server struct {
|
|
@@ -123,15 +136,17 @@ type serverConn struct {
|
|
|
flow *flow // connection-wide (not stream-specific) flow control
|
|
flow *flow // connection-wide (not stream-specific) flow control
|
|
|
|
|
|
|
|
// Everything following is owned by the serve loop; use serveG.check():
|
|
// Everything following is owned by the serve loop; use serveG.check():
|
|
|
- maxStreamID uint32 // max ever seen
|
|
|
|
|
- streams map[uint32]*stream
|
|
|
|
|
- maxWriteFrameSize uint32 // TODO: update this when settings come in
|
|
|
|
|
- initialWindowSize int32
|
|
|
|
|
- canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
|
|
|
|
|
- sentGoAway bool
|
|
|
|
|
- req requestParam // non-zero while reading request headers
|
|
|
|
|
- writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
|
|
|
|
|
- writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
|
|
|
|
|
|
|
+ sawFirstSettings bool // got the initial SETTINGS frame after the preface
|
|
|
|
|
+ needToSendSettingsAck bool
|
|
|
|
|
+ maxStreamID uint32 // max ever seen
|
|
|
|
|
+ streams map[uint32]*stream
|
|
|
|
|
+ maxWriteFrameSize uint32 // TODO: update this when settings come in
|
|
|
|
|
+ initialWindowSize int32
|
|
|
|
|
+ canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
|
|
|
|
|
+ sentGoAway bool
|
|
|
|
|
+ req requestParam // non-zero while reading request headers
|
|
|
|
|
+ writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
|
|
|
|
|
+ writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
|
|
|
|
|
|
|
|
// Owned by the writeFrames goroutine; use writeG.check():
|
|
// Owned by the writeFrames goroutine; use writeG.check():
|
|
|
headerWriteBuf bytes.Buffer
|
|
headerWriteBuf bytes.Buffer
|
|
@@ -316,39 +331,13 @@ func (sc *serverConn) serve() {
|
|
|
|
|
|
|
|
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
|
|
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
|
|
|
|
|
|
|
|
- if err := sc.readPreface(); err != nil {
|
|
|
|
|
- sc.condlogf(err, "Error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- f, err := sc.framer.ReadFrame() // TODO: timeout
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- sc.logf("error reading initial frame from client: %v", err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- sf, ok := f.(*SettingsFrame)
|
|
|
|
|
- if !ok {
|
|
|
|
|
- sc.logf("invalid initial frame type %T received from client", f)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- if err := sf.ForeachSetting(sc.processSetting); err != nil {
|
|
|
|
|
- sc.logf("initial settings error: %v", err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // TODO: don't send two network packets for our SETTINGS + our
|
|
|
|
|
- // ACK of their settings. But if we make framer write to a
|
|
|
|
|
- // *bufio.Writer, that increases the per-connection memory
|
|
|
|
|
- // overhead, and there could be many idle conns. So maybe some
|
|
|
|
|
- // liveswitchWriter-like thing where we only switch to a
|
|
|
|
|
- // *bufio Writer when we really need one temporarily, else go
|
|
|
|
|
- // back to an unbuffered writes by default.
|
|
|
|
|
if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil {
|
|
if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil {
|
|
|
sc.logf("error writing server's initial settings: %v", err)
|
|
sc.logf("error writing server's initial settings: %v", err)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- if err := sc.framer.WriteSettingsAck(); err != nil {
|
|
|
|
|
- sc.logf("error writing server's ack of client's settings: %v", err)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if err := sc.readPreface(); err != nil {
|
|
|
|
|
+ sc.condlogf(err, "Error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -356,6 +345,8 @@ func (sc *serverConn) serve() {
|
|
|
go sc.writeFrames()
|
|
go sc.writeFrames()
|
|
|
defer close(sc.writeFrameCh) // shuts down writeFrames loop
|
|
defer close(sc.writeFrameCh) // shuts down writeFrames loop
|
|
|
|
|
|
|
|
|
|
+ settingsTimer := time.NewTimer(firstSettingsTimeout)
|
|
|
|
|
+
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
case wm := <-sc.wantWriteFrameCh:
|
|
case wm := <-sc.wantWriteFrameCh:
|
|
@@ -364,40 +355,16 @@ func (sc *serverConn) serve() {
|
|
|
sc.writingFrame = false
|
|
sc.writingFrame = false
|
|
|
sc.scheduleFrameWrite()
|
|
sc.scheduleFrameWrite()
|
|
|
case fg, ok := <-sc.readFrameCh:
|
|
case fg, ok := <-sc.readFrameCh:
|
|
|
- if !ok {
|
|
|
|
|
- err := <-sc.readFrameErrCh
|
|
|
|
|
- if err != io.EOF {
|
|
|
|
|
- errstr := err.Error()
|
|
|
|
|
- if !strings.Contains(errstr, "use of closed network connection") {
|
|
|
|
|
- sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if !sc.processFrameFromReader(fg, ok) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- f := fg.f
|
|
|
|
|
- sc.vlogf("got %v: %#v", f.Header(), f)
|
|
|
|
|
- err := sc.processFrame(f)
|
|
|
|
|
- fg.g.Done() // unblock the readFrames goroutine
|
|
|
|
|
- switch ev := err.(type) {
|
|
|
|
|
- case nil:
|
|
|
|
|
- // nothing.
|
|
|
|
|
- case StreamError:
|
|
|
|
|
- if err := sc.resetStreamInLoop(ev); err != nil {
|
|
|
|
|
- sc.logf("Error writing RSTSTream: %v", err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- case ConnectionError:
|
|
|
|
|
- sc.logf("Disconnecting; %v", ev)
|
|
|
|
|
- return
|
|
|
|
|
- case goAwayFlowError:
|
|
|
|
|
- if err := sc.goAway(ErrCodeFlowControl); err != nil {
|
|
|
|
|
- sc.condlogf(err, "failed to GOAWAY: %v", err)
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- default:
|
|
|
|
|
- sc.logf("Disconnection due to other error: %v", err)
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ if settingsTimer.C != nil {
|
|
|
|
|
+ settingsTimer.Stop()
|
|
|
|
|
+ settingsTimer.C = nil
|
|
|
}
|
|
}
|
|
|
|
|
+ case <-settingsTimer.C:
|
|
|
|
|
+ sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -442,8 +409,26 @@ func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
|
|
|
sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
|
|
sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (sc *serverConn) enqueueSettingsAck() {
|
|
|
|
|
+ sc.serveG.check()
|
|
|
|
|
+ // Fast path for common case:
|
|
|
|
|
+ if !sc.writingFrame {
|
|
|
|
|
+ sc.wantWriteFrameCh <- frameWriteMsg{write: (*serverConn).writeSettingsAck}
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ sc.needToSendSettingsAck = true
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (sc *serverConn) scheduleFrameWrite() {
|
|
func (sc *serverConn) scheduleFrameWrite() {
|
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
|
|
|
+ if sc.writingFrame {
|
|
|
|
|
+ panic("invariant")
|
|
|
|
|
+ }
|
|
|
|
|
+ if sc.needToSendSettingsAck {
|
|
|
|
|
+ sc.needToSendSettingsAck = false
|
|
|
|
|
+ sc.enqueueSettingsAck()
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
if len(sc.writeQueue) == 0 {
|
|
if len(sc.writeQueue) == 0 {
|
|
|
// TODO: flush Framer's underlying buffered writer, once that's added
|
|
// TODO: flush Framer's underlying buffered writer, once that's added
|
|
|
return
|
|
return
|
|
@@ -455,6 +440,10 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
copy(sc.writeQueue, sc.writeQueue[1:])
|
|
copy(sc.writeQueue, sc.writeQueue[1:])
|
|
|
sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1]
|
|
sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1]
|
|
|
|
|
|
|
|
|
|
+ // TODO: if wm is a data frame, make sure it's not too big
|
|
|
|
|
+ // (because a SETTINGS frame changed our max frame size while
|
|
|
|
|
+ // a stream was open and writing) and cut it up into smaller
|
|
|
|
|
+ // bits.
|
|
|
sc.writingFrame = true
|
|
sc.writingFrame = true
|
|
|
sc.writeFrameCh <- wm
|
|
sc.writeFrameCh <- wm
|
|
|
}
|
|
}
|
|
@@ -483,9 +472,64 @@ func (sc *serverConn) curHeaderStreamID() uint32 {
|
|
|
return st.id
|
|
return st.id
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// processFrameFromReader processes the serve loop's read from readFrameCh from the
|
|
|
|
|
+// frame-reading goroutine.
|
|
|
|
|
+// processFrameFromReader returns whether the connection should be kept open.
|
|
|
|
|
+func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
|
|
|
|
|
+ sc.serveG.check()
|
|
|
|
|
+ if !fgValid {
|
|
|
|
|
+ err := <-sc.readFrameErrCh
|
|
|
|
|
+ if err != io.EOF {
|
|
|
|
|
+ errstr := err.Error()
|
|
|
|
|
+ if !strings.Contains(errstr, "use of closed network connection") {
|
|
|
|
|
+ sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // TODO: could we also get into this state if the peer does a half close (e.g. CloseWrite)
|
|
|
|
|
+ // because they're done sending frames but they're still wanting our open replies?
|
|
|
|
|
+ // Investigate.
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ f := fg.f
|
|
|
|
|
+ sc.vlogf("got %v: %#v", f.Header(), f)
|
|
|
|
|
+ err := sc.processFrame(f)
|
|
|
|
|
+ fg.g.Done() // unblock the readFrames goroutine
|
|
|
|
|
+ if err == nil {
|
|
|
|
|
+ return true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch ev := err.(type) {
|
|
|
|
|
+ case StreamError:
|
|
|
|
|
+ if err := sc.resetStreamInLoop(ev); err != nil {
|
|
|
|
|
+ sc.logf("Error writing RSTSTream: %v", err)
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ return true
|
|
|
|
|
+ case goAwayFlowError:
|
|
|
|
|
+ if err := sc.goAway(ErrCodeFlowControl); err != nil {
|
|
|
|
|
+ sc.condlogf(err, "failed to GOAWAY: %v", err)
|
|
|
|
|
+ return false
|
|
|
|
|
+ }
|
|
|
|
|
+ return true
|
|
|
|
|
+ case ConnectionError:
|
|
|
|
|
+ sc.logf("disconnecting; %v", ev)
|
|
|
|
|
+ default:
|
|
|
|
|
+ sc.logf("Disconnection due to other error: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ return false
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (sc *serverConn) processFrame(f Frame) error {
|
|
func (sc *serverConn) processFrame(f Frame) error {
|
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
|
|
|
|
|
|
|
|
+ // First frame received must be SETTINGS.
|
|
|
|
|
+ if !sc.sawFirstSettings {
|
|
|
|
|
+ if _, ok := f.(*SettingsFrame); !ok {
|
|
|
|
|
+ return ConnectionError(ErrCodeProtocol)
|
|
|
|
|
+ }
|
|
|
|
|
+ sc.sawFirstSettings = true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if s := sc.curHeaderStreamID(); s != 0 {
|
|
if s := sc.curHeaderStreamID(); s != 0 {
|
|
|
if cf, ok := f.(*ContinuationFrame); !ok {
|
|
if cf, ok := f.(*ContinuationFrame); !ok {
|
|
|
return ConnectionError(ErrCodeProtocol)
|
|
return ConnectionError(ErrCodeProtocol)
|
|
@@ -567,7 +611,19 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
|
|
|
|
|
|
|
|
func (sc *serverConn) processSettings(f *SettingsFrame) error {
|
|
func (sc *serverConn) processSettings(f *SettingsFrame) error {
|
|
|
sc.serveG.check()
|
|
sc.serveG.check()
|
|
|
- return f.ForeachSetting(sc.processSetting)
|
|
|
|
|
|
|
+ if f.IsAck() {
|
|
|
|
|
+ // TODO: do we need to do anything?
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := f.ForeachSetting(sc.processSetting); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ sc.enqueueSettingsAck()
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (sc *serverConn) writeSettingsAck(_ interface{}) error {
|
|
|
|
|
+ return sc.framer.WriteSettingsAck()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (sc *serverConn) processSetting(s Setting) error {
|
|
func (sc *serverConn) processSetting(s Setting) error {
|