|
|
@@ -34,7 +34,7 @@ const (
|
|
|
prefaceTimeout = 10 * time.Second
|
|
|
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
|
|
|
handlerChunkWriteSize = 4 << 10
|
|
|
- defaultMaxStreams = 250
|
|
|
+ defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
@@ -207,6 +207,7 @@ func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
|
|
|
pushEnabled: true,
|
|
|
}
|
|
|
sc.flow.add(initialWindowSize)
|
|
|
+ sc.inflow.add(initialWindowSize)
|
|
|
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
|
|
|
sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
|
|
|
|
|
|
@@ -317,7 +318,8 @@ type serverConn struct {
|
|
|
wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
|
|
|
bodyReadCh chan bodyReadMsg // from handlers -> serve
|
|
|
testHookCh chan func() // code to run on the serve loop
|
|
|
- flow flow // connection-wide (not stream-specific) flow control
|
|
|
+ flow flow // conn-wide (not stream-specific) outbound flow control
|
|
|
+ inflow flow // conn-wide inbound flow control
|
|
|
tlsState *tls.ConnectionState // shared by all handlers, like net/http
|
|
|
|
|
|
// Everything following is owned by the serve loop; use serveG.check():
|
|
|
@@ -374,18 +376,19 @@ type requestParam struct {
|
|
|
type stream struct {
|
|
|
// immutable:
|
|
|
id uint32
|
|
|
- flow flow // limits writing from Handler to client
|
|
|
body *pipe // non-nil if expecting DATA frames
|
|
|
cw closeWaiter // closed wait stream transitions to closed state
|
|
|
|
|
|
// owned by serverConn's serve loop:
|
|
|
+ bodyBytes int64 // body bytes seen so far
|
|
|
+ declBodyBytes int64 // or -1 if undeclared
|
|
|
+ flow flow // limits writing from Handler to client
|
|
|
+ inflow flow // what the client is allowed to POST/etc to us
|
|
|
parent *stream // or nil
|
|
|
weight uint8
|
|
|
state streamState
|
|
|
- bodyBytes int64 // body bytes seen so far
|
|
|
- declBodyBytes int64 // or -1 if undeclared
|
|
|
- sentReset bool // only true once detached from streams map
|
|
|
- gotReset bool // only true once detacted from streams map
|
|
|
+ sentReset bool // only true once detached from streams map
|
|
|
+ gotReset bool // only true once detacted from streams map
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) Framer() *Framer { return sc.framer }
|
|
|
@@ -651,7 +654,7 @@ func (sc *serverConn) readPreface() error {
|
|
|
errc <- nil
|
|
|
}
|
|
|
}()
|
|
|
- timer := time.NewTimer(5 * time.Second) // TODO: configurable on *Server?
|
|
|
+ timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
|
|
|
defer timer.Stop()
|
|
|
select {
|
|
|
case <-timer.C:
|
|
|
@@ -1145,13 +1148,11 @@ func (sc *serverConn) processData(f *DataFrame) error {
|
|
|
return StreamError{id, ErrCodeStreamClosed}
|
|
|
}
|
|
|
if len(data) > 0 {
|
|
|
- // TODO: verify they're allowed to write with the flow
|
|
|
- // control window we'd advertised to them. (currently
|
|
|
- // this is fails elsewhere, in that the body buffer is
|
|
|
- // always 65k, the default initial window size, but
|
|
|
- // once that's fixed to grow and shrink on demand,
|
|
|
- // we'll need to be stricter before that, or in the
|
|
|
- // buffer code)
|
|
|
+ // Check whether the client has flow control quota.
|
|
|
+ if int(st.inflow.available()) < len(data) {
|
|
|
+ return StreamError{id, ErrCodeFlowControl}
|
|
|
+ }
|
|
|
+ st.inflow.take(int32(len(data)))
|
|
|
wrote, err := st.body.Write(data)
|
|
|
if err != nil {
|
|
|
return StreamError{id, ErrCodeStreamClosed}
|
|
|
@@ -1198,13 +1199,16 @@ func (sc *serverConn) processHeaders(f *HeadersFrame) error {
|
|
|
id: id,
|
|
|
state: stateOpen,
|
|
|
}
|
|
|
- // connection-level flow control is shared by all streams.
|
|
|
- st.flow.conn = &sc.flow
|
|
|
- st.flow.add(sc.initialWindowSize)
|
|
|
- st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
|
|
|
if f.StreamEnded() {
|
|
|
st.state = stateHalfClosedRemote
|
|
|
}
|
|
|
+ st.cw.Init()
|
|
|
+
|
|
|
+ st.flow.conn = &sc.flow // link to conn-level counter
|
|
|
+ st.flow.add(sc.initialWindowSize)
|
|
|
+ st.inflow.conn = &sc.inflow // link to conn-level counter
|
|
|
+ st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
|
|
|
+
|
|
|
sc.streams[id] = st
|
|
|
if f.HasPriority() {
|
|
|
sc.adjustStreamPriority(st.id, f.Priority)
|
|
|
@@ -1457,23 +1461,42 @@ func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
sc.serveG.check()
|
|
|
// "The legal range for the increment to the flow control
|
|
|
// window is 1 to 2^31-1 (2,147,483,647) octets."
|
|
|
+ // A Go Read call on 64-bit machines could in theory read
|
|
|
+ // a larger Read than this. Very unlikely, but we handle it here
|
|
|
+ // rather than elsewhere for now.
|
|
|
+ const maxUint31 = 1<<31 - 1
|
|
|
+ for n >= maxUint31 {
|
|
|
+ sc.sendWindowUpdate32(st, maxUint31)
|
|
|
+ n -= maxUint31
|
|
|
+ }
|
|
|
+ sc.sendWindowUpdate32(st, int32(n))
|
|
|
+}
|
|
|
+
|
|
|
+// st may be nil for conn-level
|
|
|
+func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
|
|
|
+ sc.serveG.check()
|
|
|
+ if n == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if n < 0 {
|
|
|
+ panic("negative update")
|
|
|
+ }
|
|
|
var streamID uint32
|
|
|
if st != nil {
|
|
|
streamID = st.id
|
|
|
}
|
|
|
- const maxUint31 = 1<<31 - 1
|
|
|
- for n >= maxUint31 {
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
- write: writeWindowUpdate{streamID: streamID, n: maxUint31},
|
|
|
- stream: st,
|
|
|
- })
|
|
|
- n -= maxUint31
|
|
|
+ sc.writeFrame(frameWriteMsg{
|
|
|
+ write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
|
|
|
+ stream: st,
|
|
|
+ })
|
|
|
+ var ok bool
|
|
|
+ if st == nil {
|
|
|
+ ok = sc.inflow.add(n)
|
|
|
+ } else {
|
|
|
+ ok = st.inflow.add(n)
|
|
|
}
|
|
|
- if n > 0 {
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
- write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
|
|
|
- stream: st,
|
|
|
- })
|
|
|
+ if !ok {
|
|
|
+ panic("internal error; sent too many window updates without decrements?")
|
|
|
}
|
|
|
}
|
|
|
|