|
|
@@ -110,11 +110,38 @@ type Server struct {
|
|
|
// activity for the purposes of IdleTimeout.
|
|
|
IdleTimeout time.Duration
|
|
|
|
|
|
+ // MaxUploadBufferPerConnection is the size of the initial flow
|
|
|
+ // control window for each connections. The HTTP/2 spec does not
|
|
|
+ // allow this to be smaller than 65535 or larger than 2^32-1.
|
|
|
+ // If the value is outside this range, a default value will be
|
|
|
+ // used instead.
|
|
|
+ MaxUploadBufferPerConnection int32
|
|
|
+
|
|
|
+ // MaxUploadBufferPerStream is the size of the initial flow control
|
|
|
+ // window for each stream. The HTTP/2 spec does not allow this to
|
|
|
+ // be larger than 2^32-1. If the value is zero or larger than the
|
|
|
+ // maximum, a default value will be used instead.
|
|
|
+ MaxUploadBufferPerStream int32
|
|
|
+
|
|
|
// NewWriteScheduler constructs a write scheduler for a connection.
|
|
|
// If nil, a default scheduler is chosen.
|
|
|
NewWriteScheduler func() WriteScheduler
|
|
|
}
|
|
|
|
|
|
+func (s *Server) initialConnRecvWindowSize() int32 {
|
|
|
+ if s.MaxUploadBufferPerConnection > initialWindowSize {
|
|
|
+ return s.MaxUploadBufferPerConnection
|
|
|
+ }
|
|
|
+ return 1 << 20
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Server) initialStreamRecvWindowSize() int32 {
|
|
|
+ if s.MaxUploadBufferPerStream > 0 {
|
|
|
+ return s.MaxUploadBufferPerStream
|
|
|
+ }
|
|
|
+ return 1 << 20
|
|
|
+}
|
|
|
+
|
|
|
func (s *Server) maxReadFrameSize() uint32 {
|
|
|
if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
|
|
|
return v
|
|
|
@@ -255,27 +282,27 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
|
|
|
defer cancel()
|
|
|
|
|
|
sc := &serverConn{
|
|
|
- srv: s,
|
|
|
- hs: opts.baseConfig(),
|
|
|
- conn: c,
|
|
|
- baseCtx: baseCtx,
|
|
|
- remoteAddrStr: c.RemoteAddr().String(),
|
|
|
- bw: newBufferedWriter(c),
|
|
|
- handler: opts.handler(),
|
|
|
- streams: make(map[uint32]*stream),
|
|
|
- readFrameCh: make(chan readFrameResult),
|
|
|
- wantWriteFrameCh: make(chan FrameWriteRequest, 8),
|
|
|
- wantStartPushCh: make(chan startPushRequest, 8),
|
|
|
- wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
|
|
|
- bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
|
|
|
- doneServing: make(chan struct{}),
|
|
|
- clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
|
|
|
- advMaxStreams: s.maxConcurrentStreams(),
|
|
|
- initialWindowSize: initialWindowSize,
|
|
|
- maxFrameSize: initialMaxFrameSize,
|
|
|
- headerTableSize: initialHeaderTableSize,
|
|
|
- serveG: newGoroutineLock(),
|
|
|
- pushEnabled: true,
|
|
|
+ srv: s,
|
|
|
+ hs: opts.baseConfig(),
|
|
|
+ conn: c,
|
|
|
+ baseCtx: baseCtx,
|
|
|
+ remoteAddrStr: c.RemoteAddr().String(),
|
|
|
+ bw: newBufferedWriter(c),
|
|
|
+ handler: opts.handler(),
|
|
|
+ streams: make(map[uint32]*stream),
|
|
|
+ readFrameCh: make(chan readFrameResult),
|
|
|
+ wantWriteFrameCh: make(chan FrameWriteRequest, 8),
|
|
|
+ wantStartPushCh: make(chan startPushRequest, 8),
|
|
|
+ wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
|
|
|
+ bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
|
|
|
+ doneServing: make(chan struct{}),
|
|
|
+ clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
|
|
|
+ advMaxStreams: s.maxConcurrentStreams(),
|
|
|
+ initialStreamSendWindowSize: initialWindowSize,
|
|
|
+ maxFrameSize: initialMaxFrameSize,
|
|
|
+ headerTableSize: initialHeaderTableSize,
|
|
|
+ serveG: newGoroutineLock(),
|
|
|
+ pushEnabled: true,
|
|
|
}
|
|
|
|
|
|
// The net/http package sets the write deadline from the
|
|
|
@@ -294,6 +321,9 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
|
|
|
sc.writeSched = NewRandomWriteScheduler()
|
|
|
}
|
|
|
|
|
|
+ // These start at the RFC-specified defaults. If there is a higher
|
|
|
+ // configured value for inflow, that will be updated when we send a
|
|
|
+ // WINDOW_UPDATE shortly after sending SETTINGS.
|
|
|
sc.flow.add(initialWindowSize)
|
|
|
sc.inflow.add(initialWindowSize)
|
|
|
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
|
|
|
@@ -387,34 +417,34 @@ type serverConn struct {
|
|
|
writeSched WriteScheduler
|
|
|
|
|
|
// Everything following is owned by the serve loop; use serveG.check():
|
|
|
- serveG goroutineLock // used to verify funcs are on serve()
|
|
|
- pushEnabled bool
|
|
|
- sawFirstSettings bool // got the initial SETTINGS frame after the preface
|
|
|
- needToSendSettingsAck bool
|
|
|
- unackedSettings int // how many SETTINGS have we sent without ACKs?
|
|
|
- clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
|
|
|
- advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
|
|
|
- curClientStreams uint32 // number of open streams initiated by the client
|
|
|
- curPushedStreams uint32 // number of open streams initiated by server push
|
|
|
- maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
|
|
|
- maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
|
|
|
- streams map[uint32]*stream
|
|
|
- initialWindowSize int32
|
|
|
- maxFrameSize int32
|
|
|
- headerTableSize uint32
|
|
|
- peerMaxHeaderListSize uint32 // zero means unknown (default)
|
|
|
- canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
|
|
|
- writingFrame bool // started writing a frame (on serve goroutine or separate)
|
|
|
- writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
|
|
|
- needsFrameFlush bool // last frame write wasn't a flush
|
|
|
- inGoAway bool // we've started to or sent GOAWAY
|
|
|
- inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
|
|
|
- needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
- goAwayCode ErrCode
|
|
|
- shutdownTimerCh <-chan time.Time // nil until used
|
|
|
- shutdownTimer *time.Timer // nil until used
|
|
|
- idleTimer *time.Timer // nil if unused
|
|
|
- idleTimerCh <-chan time.Time // nil if unused
|
|
|
+ serveG goroutineLock // used to verify funcs are on serve()
|
|
|
+ pushEnabled bool
|
|
|
+ sawFirstSettings bool // got the initial SETTINGS frame after the preface
|
|
|
+ needToSendSettingsAck bool
|
|
|
+ unackedSettings int // how many SETTINGS have we sent without ACKs?
|
|
|
+ clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
|
|
|
+ advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
|
|
|
+ curClientStreams uint32 // number of open streams initiated by the client
|
|
|
+ curPushedStreams uint32 // number of open streams initiated by server push
|
|
|
+ maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
|
|
|
+ maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
|
|
|
+ streams map[uint32]*stream
|
|
|
+ initialStreamSendWindowSize int32
|
|
|
+ maxFrameSize int32
|
|
|
+ headerTableSize uint32
|
|
|
+ peerMaxHeaderListSize uint32 // zero means unknown (default)
|
|
|
+ canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
|
|
|
+ writingFrame bool // started writing a frame (on serve goroutine or separate)
|
|
|
+ writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
|
|
|
+ needsFrameFlush bool // last frame write wasn't a flush
|
|
|
+ inGoAway bool // we've started to or sent GOAWAY
|
|
|
+ inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
|
|
|
+ needToSendGoAway bool // we need to schedule a GOAWAY frame write
|
|
|
+ goAwayCode ErrCode
|
|
|
+ shutdownTimerCh <-chan time.Time // nil until used
|
|
|
+ shutdownTimer *time.Timer // nil until used
|
|
|
+ idleTimer *time.Timer // nil if unused
|
|
|
+ idleTimerCh <-chan time.Time // nil if unused
|
|
|
|
|
|
// Owned by the writeFrameAsync goroutine:
|
|
|
headerWriteBuf bytes.Buffer
|
|
|
@@ -695,15 +725,17 @@ func (sc *serverConn) serve() {
|
|
|
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
|
|
|
{SettingMaxConcurrentStreams, sc.advMaxStreams},
|
|
|
{SettingMaxHeaderListSize, sc.maxHeaderListSize()},
|
|
|
-
|
|
|
- // TODO: more actual settings, notably
|
|
|
- // SettingInitialWindowSize, but then we also
|
|
|
- // want to bump up the conn window size the
|
|
|
- // same amount here right after the settings
|
|
|
+ {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
|
|
|
},
|
|
|
})
|
|
|
sc.unackedSettings++
|
|
|
|
|
|
+ // Each connection starts with intialWindowSize inflow tokens.
|
|
|
+ // If a higher value is configured, we add more tokens.
|
|
|
+ if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
|
|
|
+ sc.sendWindowUpdate(nil, int(diff))
|
|
|
+ }
|
|
|
+
|
|
|
if err := sc.readPreface(); err != nil {
|
|
|
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
|
|
|
return
|
|
|
@@ -1394,9 +1426,9 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
|
|
|
// adjust the size of all stream flow control windows that it
|
|
|
// maintains by the difference between the new value and the
|
|
|
// old value."
|
|
|
- old := sc.initialWindowSize
|
|
|
- sc.initialWindowSize = int32(val)
|
|
|
- growth := sc.initialWindowSize - old // may be negative
|
|
|
+ old := sc.initialStreamSendWindowSize
|
|
|
+ sc.initialStreamSendWindowSize = int32(val)
|
|
|
+ growth := int32(val) - old // may be negative
|
|
|
for _, st := range sc.streams {
|
|
|
if !st.flow.add(growth) {
|
|
|
// 6.9.2 Initial Flow Control Window Size
|
|
|
@@ -1718,9 +1750,9 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
|
|
|
}
|
|
|
st.cw.Init()
|
|
|
st.flow.conn = &sc.flow // link to conn-level counter
|
|
|
- st.flow.add(sc.initialWindowSize)
|
|
|
- st.inflow.conn = &sc.inflow // link to conn-level counter
|
|
|
- st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
|
|
|
+ st.flow.add(sc.initialStreamSendWindowSize)
|
|
|
+ st.inflow.conn = &sc.inflow // link to conn-level counter
|
|
|
+ st.inflow.add(sc.srv.initialStreamRecvWindowSize())
|
|
|
|
|
|
sc.streams[id] = st
|
|
|
sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
|