|
|
@@ -424,12 +424,12 @@ func (sc *serverConn) readFrames() {
|
|
|
// At most one goroutine can be running writeFrameAsync at a time per
|
|
|
// serverConn.
|
|
|
func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
|
|
|
- err := wm.write(sc, wm.v)
|
|
|
+ err := wm.write.writeFrame(sc)
|
|
|
if ch := wm.done; ch != nil {
|
|
|
select {
|
|
|
case ch <- err:
|
|
|
default:
|
|
|
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
|
|
|
+ panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
|
|
|
}
|
|
|
}
|
|
|
sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
|
|
|
@@ -459,8 +459,7 @@ func (sc *serverConn) serve() {
|
|
|
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
|
|
|
|
|
|
sc.writeFrame(frameWriteMsg{
|
|
|
- write: writeSettings,
|
|
|
- v: []Setting{
|
|
|
+ write: writeSettings{
|
|
|
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
|
|
|
{SettingMaxConcurrentStreams, sc.advMaxStreams},
|
|
|
/* TODO: more actual settings */
|
|
|
@@ -533,10 +532,10 @@ func (sc *serverConn) readPreface() error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// writeData writes the data described in req to stream.id.
|
|
|
+// writeDataFromHandler writes the data described in req to stream.id.
|
|
|
//
|
|
|
// The provided ch is used to avoid allocating new channels for each
|
|
|
-// write operation. It's expected that the caller reuses req and ch
|
|
|
+// write operation. It's expected that the caller reuses writeData and ch
|
|
|
// over time.
|
|
|
//
|
|
|
// The flow control currently happens in the Handler where it waits
|
|
|
@@ -545,14 +544,11 @@ func (sc *serverConn) readPreface() error {
|
|
|
// change when priority is implemented, so the serve goroutine knows
|
|
|
// the total amount of bytes waiting to be sent and can can have more
|
|
|
// scheduling decisions available.
|
|
|
-func (sc *serverConn) writeData(stream *stream, data *dataWriteParams, ch chan error) error {
|
|
|
+func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData, ch chan error) error {
|
|
|
sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: writeDataFrame,
|
|
|
- cost: uint32(len(data.p)),
|
|
|
- stream: stream,
|
|
|
- endStream: data.end,
|
|
|
- v: data,
|
|
|
- done: ch,
|
|
|
+ write: writeData,
|
|
|
+ stream: stream,
|
|
|
+ done: ch,
|
|
|
})
|
|
|
select {
|
|
|
case err := <-ch:
|
|
|
@@ -620,9 +616,9 @@ func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
|
|
|
|
|
|
sc.writingFrame = true
|
|
|
sc.needsFrameFlush = true
|
|
|
- if wm.endStream {
|
|
|
+ if endsStream(wm.write) {
|
|
|
if st == nil {
|
|
|
- panic("nil stream with endStream set")
|
|
|
+ panic("internal error: expecting non-nil stream")
|
|
|
}
|
|
|
switch st.state {
|
|
|
case stateOpen:
|
|
|
@@ -654,8 +650,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
if sc.needToSendGoAway {
|
|
|
sc.needToSendGoAway = false
|
|
|
sc.startFrameWrite(frameWriteMsg{
|
|
|
- write: writeGoAwayFrame,
|
|
|
- v: &goAwayParams{
|
|
|
+ write: &writeGoAway{
|
|
|
maxStreamID: sc.maxStreamID,
|
|
|
code: sc.goAwayCode,
|
|
|
},
|
|
|
@@ -663,7 +658,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
return
|
|
|
}
|
|
|
if sc.writeSched.empty() && sc.needsFrameFlush {
|
|
|
- sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter})
|
|
|
+ sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
|
|
|
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
|
|
|
return
|
|
|
}
|
|
|
@@ -673,7 +668,7 @@ func (sc *serverConn) scheduleFrameWrite() {
|
|
|
}
|
|
|
if sc.needToSendSettingsAck {
|
|
|
sc.needToSendSettingsAck = false
|
|
|
- sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck})
|
|
|
+ sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
|
|
|
return
|
|
|
}
|
|
|
if sc.writeSched.empty() {
|
|
|
@@ -715,10 +710,7 @@ func (sc *serverConn) resetStream(se StreamError) {
|
|
|
if !ok {
|
|
|
panic("internal package error; resetStream called on non-existent stream")
|
|
|
}
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
- write: writeRSTStreamFrame,
|
|
|
- v: &se,
|
|
|
- })
|
|
|
+ sc.writeFrame(frameWriteMsg{write: se})
|
|
|
st.sentReset = true
|
|
|
sc.closeStream(st, se)
|
|
|
}
|
|
|
@@ -845,10 +837,7 @@ func (sc *serverConn) processPing(f *PingFrame) error {
|
|
|
// PROTOCOL_ERROR."
|
|
|
return ConnectionError(ErrCodeProtocol)
|
|
|
}
|
|
|
- sc.writeFrame(frameWriteMsg{
|
|
|
- write: writePingAck,
|
|
|
- v: f,
|
|
|
- })
|
|
|
+ sc.writeFrame(frameWriteMsg{write: writePingAck{f}})
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -1227,23 +1216,12 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
|
|
|
sc.handler.ServeHTTP(rw, req)
|
|
|
}
|
|
|
|
|
|
-// headerWriteReq is a request to write an HTTP response header from a server Handler.
|
|
|
-type headerWriteReq struct {
|
|
|
- stream *stream
|
|
|
- httpResCode int
|
|
|
- h http.Header // may be nil
|
|
|
- endStream bool
|
|
|
-
|
|
|
- contentType string
|
|
|
- contentLength string
|
|
|
-}
|
|
|
-
|
|
|
// called from handler goroutines.
|
|
|
// h may be nil.
|
|
|
-func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
|
|
|
+func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders, tempCh chan error) {
|
|
|
sc.serveG.checkNotOn() // NOT on
|
|
|
var errc chan error
|
|
|
- if req.h != nil {
|
|
|
+ if headerData.h != nil {
|
|
|
// If there's a header map (which we don't own), so we have to block on
|
|
|
// waiting for this frame to be written, so an http.Flush mid-handler
|
|
|
// writes out the correct value of keys, before a handler later potentially
|
|
|
@@ -1251,11 +1229,9 @@ func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
|
|
|
errc = tempCh
|
|
|
}
|
|
|
sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: writeHeadersFrame,
|
|
|
- v: req,
|
|
|
- stream: req.stream,
|
|
|
- done: errc,
|
|
|
- endStream: req.endStream,
|
|
|
+ write: headerData,
|
|
|
+ stream: st,
|
|
|
+ done: errc,
|
|
|
})
|
|
|
if errc != nil {
|
|
|
select {
|
|
|
@@ -1271,8 +1247,7 @@ func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
|
|
|
// called from handler goroutines.
|
|
|
func (sc *serverConn) write100ContinueHeaders(st *stream) {
|
|
|
sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: write100ContinueHeadersFrame,
|
|
|
- v: st,
|
|
|
+ write: write100ContinueHeadersFrame{st.id},
|
|
|
stream: st,
|
|
|
})
|
|
|
}
|
|
|
@@ -1285,16 +1260,14 @@ func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
|
|
|
const maxUint32 = 2147483647
|
|
|
for n >= maxUint32 {
|
|
|
sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: writeWindowUpdate,
|
|
|
- v: windowUpdateReq{streamID: st.id, n: maxUint32},
|
|
|
+ write: writeWindowUpdate{streamID: st.id, n: maxUint32},
|
|
|
stream: st,
|
|
|
})
|
|
|
n -= maxUint32
|
|
|
}
|
|
|
if n > 0 {
|
|
|
sc.writeFrameFromHandler(frameWriteMsg{
|
|
|
- write: writeWindowUpdate,
|
|
|
- v: windowUpdateReq{streamID: st.id, n: uint32(n)},
|
|
|
+ write: writeWindowUpdate{streamID: st.id, n: uint32(n)},
|
|
|
stream: st,
|
|
|
})
|
|
|
}
|
|
|
@@ -1363,7 +1336,7 @@ type responseWriterState struct {
|
|
|
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
|
|
|
sentHeader bool // have we sent the header frame?
|
|
|
handlerDone bool // handler has finished
|
|
|
- curWrite dataWriteParams
|
|
|
+ curWrite writeData
|
|
|
frameWriteCh chan error // re-used whenever we need to block on a frame being written
|
|
|
|
|
|
closeNotifierMu sync.Mutex // guards closeNotifierCh
|
|
|
@@ -1373,8 +1346,8 @@ type responseWriterState struct {
|
|
|
func (rws *responseWriterState) writeData(p []byte, end bool) error {
|
|
|
rws.curWrite.streamID = rws.stream.id
|
|
|
rws.curWrite.p = p
|
|
|
- rws.curWrite.end = end
|
|
|
- return rws.stream.conn.writeData(rws.stream, &rws.curWrite, rws.frameWriteCh)
|
|
|
+ rws.curWrite.endStream = end
|
|
|
+ return rws.stream.conn.writeDataFromHandler(rws.stream, &rws.curWrite, rws.frameWriteCh)
|
|
|
}
|
|
|
|
|
|
type chunkWriter struct{ rws *responseWriterState }
|
|
|
@@ -1401,8 +1374,8 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
|
|
|
ctype = http.DetectContentType(p)
|
|
|
}
|
|
|
endStream := rws.handlerDone && len(p) == 0
|
|
|
- rws.stream.conn.writeHeaders(headerWriteReq{
|
|
|
- stream: rws.stream,
|
|
|
+ rws.stream.conn.writeHeaders(rws.stream, &writeResHeaders{
|
|
|
+ streamID: rws.stream.id,
|
|
|
httpResCode: rws.status,
|
|
|
h: rws.snapHeader,
|
|
|
endStream: endStream,
|