|
|
@@ -307,10 +307,9 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
|
|
|
|
|
|
// The net/http package sets the write deadline from the
|
|
|
// http.Server.WriteTimeout during the TLS handshake, but then
|
|
|
- // passes the connection off to us with the deadline already
|
|
|
- // set. Disarm it here so that it is not applied to additional
|
|
|
- // streams opened on this connection.
|
|
|
- // TODO: implement WriteTimeout fully. See Issue 18437.
|
|
|
+ // passes the connection off to us with the deadline already set.
|
|
|
+ // Write deadlines are set per stream in serverConn.newStream.
|
|
|
+ // Disarm the net.Conn write deadline here.
|
|
|
if sc.hs.WriteTimeout != 0 {
|
|
|
sc.conn.SetWriteDeadline(time.Time{})
|
|
|
}
|
|
|
@@ -493,9 +492,10 @@ type stream struct {
|
|
|
numTrailerValues int64
|
|
|
weight uint8
|
|
|
state streamState
|
|
|
- resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
|
|
|
- gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
|
- wroteHeaders bool // whether we wrote headers (not status 100)
|
|
|
+ resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
|
|
|
+ gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
|
+ wroteHeaders bool // whether we wrote headers (not status 100)
|
|
|
+ writeDeadline *time.Timer // nil if unused
|
|
|
|
|
|
trailer http.Header // accumulated trailers
|
|
|
reqTrailer http.Header // handler's Request.Trailer
|
|
|
@@ -766,6 +766,10 @@ func (sc *serverConn) serve() {
|
|
|
loopNum++
|
|
|
select {
|
|
|
case wr := <-sc.wantWriteFrameCh:
|
|
|
+ if se, ok := wr.write.(StreamError); ok {
|
|
|
+ sc.resetStream(se)
|
|
|
+ break
|
|
|
+ }
|
|
|
sc.writeFrame(wr)
|
|
|
case spr := <-sc.wantStartPushCh:
|
|
|
sc.startPush(spr)
|
|
|
@@ -1336,6 +1340,9 @@ func (sc *serverConn) closeStream(st *stream, err error) {
|
|
|
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
|
|
|
}
|
|
|
st.state = stateClosed
|
|
|
+ if st.writeDeadline != nil {
|
|
|
+ st.writeDeadline.Stop()
|
|
|
+ }
|
|
|
if st.isPushed() {
|
|
|
sc.curPushedStreams--
|
|
|
} else {
|
|
|
@@ -1574,6 +1581,12 @@ func (st *stream) copyTrailersToHandlerRequest() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
|
|
|
+// when the stream's WriteTimeout has fired.
|
|
|
+func (st *stream) onWriteTimeout() {
|
|
|
+ st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
|
|
|
+}
|
|
|
+
|
|
|
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
|
|
|
sc.serveG.check()
|
|
|
id := f.StreamID
|
|
|
@@ -1753,6 +1766,9 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
|
|
|
st.flow.add(sc.initialStreamSendWindowSize)
|
|
|
st.inflow.conn = &sc.inflow // link to conn-level counter
|
|
|
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
|
|
|
+ if sc.hs.WriteTimeout != 0 {
|
|
|
+ st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
|
|
|
+ }
|
|
|
|
|
|
sc.streams[id] = st
|
|
|
sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
|