|
@@ -195,18 +195,22 @@ type clientStream struct {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// awaitRequestCancel runs in its own goroutine and waits for the user
|
|
// awaitRequestCancel runs in its own goroutine and waits for the user
|
|
|
-// to either cancel a RoundTrip request (using the provided
|
|
|
|
|
-// Request.Cancel channel), or for the request to be done (any way it
|
|
|
|
|
-// might be removed from the cc.streams map: peer reset, successful
|
|
|
|
|
-// completion, TCP connection breakage, etc)
|
|
|
|
|
-func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
|
|
|
|
|
- if cancel == nil {
|
|
|
|
|
|
|
+// to cancel a RoundTrip request, its context to expire, or for the
|
|
|
|
|
+// request to be done (any way it might be removed from the cc.streams
|
|
|
|
|
+// map: peer reset, successful completion, TCP connection breakage,
|
|
|
|
|
+// etc)
|
|
|
|
|
+func (cs *clientStream) awaitRequestCancel(req *http.Request) {
|
|
|
|
|
+ ctx := reqContext(req)
|
|
|
|
|
+ if req.Cancel == nil && ctx.Done() == nil {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
select {
|
|
select {
|
|
|
- case <-cancel:
|
|
|
|
|
|
|
+ case <-req.Cancel:
|
|
|
cs.bufPipe.CloseWithError(errRequestCanceled)
|
|
cs.bufPipe.CloseWithError(errRequestCanceled)
|
|
|
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ cs.bufPipe.CloseWithError(ctx.Err())
|
|
|
|
|
+ cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
case <-cs.done:
|
|
case <-cs.done:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -684,6 +688,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
|
|
|
|
|
readLoopResCh := cs.resc
|
|
readLoopResCh := cs.resc
|
|
|
bodyWritten := false
|
|
bodyWritten := false
|
|
|
|
|
+ ctx := reqContext(req)
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
@@ -716,6 +721,14 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
|
}
|
|
}
|
|
|
return nil, errTimeout
|
|
return nil, errTimeout
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ cc.forgetStreamID(cs.ID)
|
|
|
|
|
+ if !hasBody || bodyWritten {
|
|
|
|
|
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil, ctx.Err()
|
|
|
case <-req.Cancel:
|
|
case <-req.Cancel:
|
|
|
cc.forgetStreamID(cs.ID)
|
|
cc.forgetStreamID(cs.ID)
|
|
|
if !hasBody || bodyWritten {
|
|
if !hasBody || bodyWritten {
|
|
@@ -1284,13 +1297,14 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
|
|
|
cs.bufPipe = pipe{b: buf}
|
|
cs.bufPipe = pipe{b: buf}
|
|
|
cs.bytesRemain = res.ContentLength
|
|
cs.bytesRemain = res.ContentLength
|
|
|
res.Body = transportResponseBody{cs}
|
|
res.Body = transportResponseBody{cs}
|
|
|
- go cs.awaitRequestCancel(cs.req.Cancel)
|
|
|
|
|
|
|
+ go cs.awaitRequestCancel(cs.req)
|
|
|
|
|
|
|
|
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
|
|
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
|
|
|
res.Header.Del("Content-Encoding")
|
|
res.Header.Del("Content-Encoding")
|
|
|
res.Header.Del("Content-Length")
|
|
res.Header.Del("Content-Length")
|
|
|
res.ContentLength = -1
|
|
res.ContentLength = -1
|
|
|
res.Body = &gzipReader{body: res.Body}
|
|
res.Body = &gzipReader{body: res.Body}
|
|
|
|
|
+ setResponseUncompressed(res)
|
|
|
}
|
|
}
|
|
|
return res, nil
|
|
return res, nil
|
|
|
}
|
|
}
|