|
|
@@ -390,7 +390,6 @@ type serverConn struct {
|
|
|
goAwayCode ErrCode
|
|
|
shutdownTimerCh <-chan time.Time // nil until used
|
|
|
shutdownTimer *time.Timer // nil until used
|
|
|
- freeRequestBodyBuf []byte // if non-nil, a free initialWindowSize buffer for getRequestBodyBuf
|
|
|
|
|
|
// Owned by the writeFrameAsync goroutine:
|
|
|
headerWriteBuf bytes.Buffer
|
|
|
@@ -434,11 +433,11 @@ type stream struct {
|
|
|
numTrailerValues int64
|
|
|
weight uint8
|
|
|
state streamState
|
|
|
- sentReset bool // only true once detached from streams map
|
|
|
- gotReset bool // only true once detacted from streams map
|
|
|
- gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
|
- wroteHeaders bool // whether we wrote headers (not status 100)
|
|
|
- reqBuf []byte
|
|
|
+ sentReset bool // only true once detached from streams map
|
|
|
+ gotReset bool // only true once detacted from streams map
|
|
|
+ gotTrailerHeader bool // HEADER frame for trailers was seen
|
|
|
+ wroteHeaders bool // whether we wrote headers (not status 100)
|
|
|
+ reqBuf []byte // if non-nil, body pipe buffer to return later at EOF
|
|
|
|
|
|
trailer http.Header // accumulated trailers
|
|
|
reqTrailer http.Header // handler's Request.Trailer
|
|
|
@@ -916,11 +915,11 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
|
|
|
// Here we would go to stateHalfClosedLocal in
|
|
|
// theory, but since our handler is done and
|
|
|
// the net/http package provides no mechanism
|
|
|
- // for finishing writing to a ResponseWriter
|
|
|
- // while still reading data (see possible TODO
|
|
|
- // at top of this file), we go into closed
|
|
|
- // state here anyway, after telling the peer
|
|
|
- // we're hanging up on them.
|
|
|
+ // for closing a ResponseWriter while still
|
|
|
+ // reading data (see possible TODO at top of
|
|
|
+ // this file), we go into closed state here
|
|
|
+ // anyway, after telling the peer we're
|
|
|
+ // hanging up on them.
|
|
|
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
|
|
|
errCancel := streamError(st.id, ErrCodeCancel)
|
|
|
sc.resetStream(errCancel)
|
|
|
@@ -1184,18 +1183,6 @@ func (sc *serverConn) closeStream(st *stream, err error) {
|
|
|
}
|
|
|
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
|
|
|
sc.writeSched.forgetStream(st.id)
|
|
|
- if st.reqBuf != nil {
|
|
|
- // Stash this request body buffer (64k) away for reuse
|
|
|
- // by a future POST/PUT/etc.
|
|
|
- //
|
|
|
- // TODO(bradfitz): share on the server? sync.Pool?
|
|
|
- // Server requires locks and might hurt contention.
|
|
|
- // sync.Pool might work, or might be worse, depending
|
|
|
- // on goroutine CPU migrations. (get and put on
|
|
|
- // separate CPUs). Maybe a mix of strategies. But
|
|
|
- // this is an easy win for now.
|
|
|
- sc.freeRequestBodyBuf = st.reqBuf
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func (sc *serverConn) processSettings(f *SettingsFrame) error {
|
|
|
@@ -1672,13 +1659,9 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
|
|
|
}
|
|
|
req = requestWithContext(req, st.ctx)
|
|
|
if bodyOpen {
|
|
|
- // Disabled, per golang.org/issue/14960:
|
|
|
- // st.reqBuf = sc.getRequestBodyBuf()
|
|
|
- // TODO: remove this 64k of garbage per request (again, but without a data race):
|
|
|
- buf := make([]byte, initialWindowSize)
|
|
|
-
|
|
|
+ st.reqBuf = getRequestBodyBuf()
|
|
|
body.pipe = &pipe{
|
|
|
- b: &fixedBuffer{buf: buf},
|
|
|
+ b: &fixedBuffer{buf: st.reqBuf},
|
|
|
}
|
|
|
|
|
|
if vv, ok := header["Content-Length"]; ok {
|
|
|
@@ -1702,13 +1685,22 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
|
|
|
return rw, req, nil
|
|
|
}
|
|
|
|
|
|
-func (sc *serverConn) getRequestBodyBuf() []byte {
|
|
|
- sc.serveG.check()
|
|
|
- if buf := sc.freeRequestBodyBuf; buf != nil {
|
|
|
- sc.freeRequestBodyBuf = nil
|
|
|
- return buf
|
|
|
+var reqBodyCache = make(chan []byte, 8)
|
|
|
+
|
|
|
+func getRequestBodyBuf() []byte {
|
|
|
+ select {
|
|
|
+ case b := <-reqBodyCache:
|
|
|
+ return b
|
|
|
+ default:
|
|
|
+ return make([]byte, initialWindowSize)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func putRequestBodyBuf(b []byte) {
|
|
|
+ select {
|
|
|
+ case reqBodyCache <- b:
|
|
|
+ default:
|
|
|
}
|
|
|
- return make([]byte, initialWindowSize)
|
|
|
}
|
|
|
|
|
|
// Run on its own goroutine.
|
|
|
@@ -1796,11 +1788,19 @@ type bodyReadMsg struct {
|
|
|
// called from handler goroutines.
|
|
|
// Notes that the handler for the given stream ID read n bytes of its body
|
|
|
// and schedules flow control tokens to be sent.
|
|
|
-func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
|
|
|
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
|
|
|
sc.serveG.checkNotOn() // NOT on
|
|
|
- select {
|
|
|
- case sc.bodyReadCh <- bodyReadMsg{st, n}:
|
|
|
- case <-sc.doneServing:
|
|
|
+ if n > 0 {
|
|
|
+ select {
|
|
|
+ case sc.bodyReadCh <- bodyReadMsg{st, n}:
|
|
|
+ case <-sc.doneServing:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if err == io.EOF {
|
|
|
+ if buf := st.reqBuf; buf != nil {
|
|
|
+ st.reqBuf = nil // shouldn't matter; field unused by other
|
|
|
+ putRequestBodyBuf(buf)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1883,9 +1883,10 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
|
|
|
return 0, io.EOF
|
|
|
}
|
|
|
n, err = b.pipe.Read(p)
|
|
|
- if n > 0 {
|
|
|
- b.conn.noteBodyReadFromHandler(b.stream, n)
|
|
|
+ if err == io.EOF {
|
|
|
+ b.pipe = nil
|
|
|
}
|
|
|
+ b.conn.noteBodyReadFromHandler(b.stream, n, err)
|
|
|
return
|
|
|
}
|
|
|
|