Browse Source

http2: fix Transport cancelation problems

The Transport code wasn't always sending a RST_STREAM to the server
in a few cases, and was also forgetting to flush its buffer.

I believe each part of this change was part of making golang/go#13556
not flaky. No additional tests at the moment;
TestTransportAndServerSharedBodyRace_h2 is pretty comprehensive as-is,
at least for an integration test. I should probably add some
finer-grained tests in the future.

Updates golang/go#13556

Change-Id: Ie824202b20ba6ee85b20edeeee86ec977e6daeb2
Reviewed-on: https://go-review.googlesource.com/18288
Reviewed-by: Andrew Gerrand <adg@golang.org>
Brad Fitzpatrick 10 years ago
parent
commit
d1ba260648
1 changed files with 56 additions and 7 deletions
  1. 56 7
      http2/transport.go

+ 56 - 7
http2/transport.go

@@ -163,6 +163,8 @@ type clientStream struct {
 	peerReset chan struct{} // closed on peer reset
 	resetErr  error         // populated before peerReset is closed
 
+	done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
+
 	// owned by clientConnReadLoop:
 	headersDone  bool // got HEADERS w/ END_HEADERS
 	trailersDone bool // got second HEADERS frame w/ END_HEADERS
@@ -171,7 +173,11 @@ type clientStream struct {
 	resTrailer http.Header // client's Response.Trailer
 }
 
-// awaitRequestCancel runs in its own goroutine and waits for the user's
+// awaitRequestCancel runs in its own goroutine and waits for the user
+// to either cancel a RoundTrip request (using the provided
+// Request.Cancel channel), or for the request to be done (any way it
+// might be removed from the cc.streams map: peer reset, successful
+// completion, TCP connection breakage, etc)
 func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
 	if cancel == nil {
 		return
@@ -179,7 +185,8 @@ func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
 	select {
 	case <-cancel:
 		cs.bufPipe.CloseWithError(errRequestCanceled)
-	case <-cs.bufPipe.Done():
+		cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+	case <-cs.done:
 	}
 }
 
@@ -547,7 +554,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
 		cs.requestedGzip = true
 	}
 
-	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,}
+	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
+	// sent by writeRequestBody below, along with any Trailers,
+	// again in form HEADERS{1}, CONTINUATION{0,})
 	hdrs := cc.encodeHeaders(req, cs.requestedGzip, trailers)
 	cc.wmu.Lock()
 	endStream := !hasBody && !hasTrailers
@@ -556,6 +565,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
 	cc.mu.Unlock()
 
 	if werr != nil {
+		if hasBody {
+			req.Body.Close() // per RoundTripper contract
+		}
+		cc.forgetStreamID(cs.ID)
+		// Don't bother sending a RST_STREAM (our write already failed;
+		// no need to keep writing)
 		return nil, werr
 	}
 
@@ -567,9 +582,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
 		}()
 	}
 
+	readLoopResCh := cs.resc
+	requestCanceledCh := requestCancel(req)
+	requestCanceled := false
 	for {
 		select {
-		case re := <-cs.resc:
+		case re := <-readLoopResCh:
 			res := re.res
 			if re.err != nil || res.StatusCode > 299 {
 				// On error or status code 3xx, 4xx, 5xx, etc abort any
@@ -584,17 +602,40 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
 				cs.abortRequestBodyWrite()
 			}
 			if re.err != nil {
+				cc.forgetStreamID(cs.ID)
 				return nil, re.err
 			}
 			res.Request = req
 			res.TLS = cc.tlsState
 			return res, nil
-		case <-requestCancel(req):
+		case <-requestCanceledCh:
+			cc.forgetStreamID(cs.ID)
 			cs.abortRequestBodyWrite()
-			return nil, errRequestCanceled
+			if !hasBody {
+				cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+				return nil, errRequestCanceled
+			}
+			// If we have a body, wait for the body write to be
+			// finished before sending the RST_STREAM frame.
+			requestCanceled = true
+			requestCanceledCh = nil // to prevent spins
+			readLoopResCh = nil     // ignore responses at this point
 		case <-cs.peerReset:
+			if requestCanceled {
+				// They hung up on us first. No need to write a RST_STREAM.
+				// But prioritize the request canceled error value, since
+				// it's likely related. (same spirit as http1 code)
+				return nil, errRequestCanceled
+			}
+			// processResetStream already removed the
+			// stream from the streams map; no need for
+			// forgetStreamID.
 			return nil, cs.resetErr
 		case err := <-bodyCopyErrc:
+			if requestCanceled {
+				cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+				return nil, errRequestCanceled
+			}
 			if err != nil {
 				return nil, err
 			}
@@ -848,6 +889,7 @@ func (cc *ClientConn) newStream() *clientStream {
 		ID:        cc.nextStreamID,
 		resc:      make(chan resAndError, 1),
 		peerReset: make(chan struct{}),
+		done:      make(chan struct{}),
 	}
 	cs.flow.add(int32(cc.initialWindowSize))
 	cs.flow.setConnFlow(&cc.flow)
@@ -858,12 +900,17 @@ func (cc *ClientConn) newStream() *clientStream {
 	return cs
 }
 
+func (cc *ClientConn) forgetStreamID(id uint32) {
+	cc.streamByID(id, true)
+}
+
 func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
 	cc.mu.Lock()
 	defer cc.mu.Unlock()
 	cs := cc.streams[id]
-	if andRemove {
+	if andRemove && cs != nil {
 		delete(cc.streams, id)
+		close(cs.done)
 	}
 	return cs
 }
@@ -921,6 +968,7 @@ func (rl *clientConnReadLoop) cleanup() {
 		case cs.resc <- resAndError{err: err}:
 		default:
 		}
+		close(cs.done)
 	}
 	cc.closed = true
 	cc.cond.Broadcast()
@@ -1313,6 +1361,7 @@ func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error)
 	// But that's only in GOAWAY. Invent a new frame type? Is there one already?
 	cc.wmu.Lock()
 	cc.fr.WriteRSTStream(streamID, code)
+	cc.bw.Flush()
 	cc.wmu.Unlock()
 }