|
@@ -1537,10 +1537,27 @@ var errClosedResponseBody = errors.New("http2: response body closed")
|
|
|
|
|
|
|
|
func (b transportResponseBody) Close() error {
|
|
func (b transportResponseBody) Close() error {
|
|
|
cs := b.cs
|
|
cs := b.cs
|
|
|
- if cs.bufPipe.Err() != io.EOF {
|
|
|
|
|
- // TODO: write test for this
|
|
|
|
|
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
|
|
|
|
|
|
|
+ cc := cs.cc
|
|
|
|
|
+
|
|
|
|
|
+ serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
|
|
|
|
|
+ unread := cs.bufPipe.Len()
|
|
|
|
|
+
|
|
|
|
|
+ if unread > 0 || !serverSentStreamEnd {
|
|
|
|
|
+ cc.mu.Lock()
|
|
|
|
|
+ cc.wmu.Lock()
|
|
|
|
|
+ if !serverSentStreamEnd {
|
|
|
|
|
+ cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
|
|
|
|
|
+ }
|
|
|
|
|
+ // Return connection-level flow control.
|
|
|
|
|
+ if unread > 0 {
|
|
|
|
|
+ cc.inflow.add(int32(unread))
|
|
|
|
|
+ cc.fr.WriteWindowUpdate(0, uint32(unread))
|
|
|
|
|
+ }
|
|
|
|
|
+ cc.bw.Flush()
|
|
|
|
|
+ cc.wmu.Unlock()
|
|
|
|
|
+ cc.mu.Unlock()
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
cs.bufPipe.BreakWithError(errClosedResponseBody)
|
|
cs.bufPipe.BreakWithError(errClosedResponseBody)
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -1548,6 +1565,7 @@ func (b transportResponseBody) Close() error {
|
|
|
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
|
cc := rl.cc
|
|
cc := rl.cc
|
|
|
cs := cc.streamByID(f.StreamID, f.StreamEnded())
|
|
cs := cc.streamByID(f.StreamID, f.StreamEnded())
|
|
|
|
|
+ data := f.Data()
|
|
|
if cs == nil {
|
|
if cs == nil {
|
|
|
cc.mu.Lock()
|
|
cc.mu.Lock()
|
|
|
neverSent := cc.nextStreamID
|
|
neverSent := cc.nextStreamID
|
|
@@ -1561,9 +1579,17 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
|
|
|
// TODO: be stricter here? only silently ignore things which
|
|
// TODO: be stricter here? only silently ignore things which
|
|
|
// we canceled, but not things which were closed normally
|
|
// we canceled, but not things which were closed normally
|
|
|
// by the peer? Tough without accumulating too much state.
|
|
// by the peer? Tough without accumulating too much state.
|
|
|
|
|
+
|
|
|
|
|
+ // But at least return their flow control:
|
|
|
|
|
+ if len(data) > 0 {
|
|
|
|
|
+ cc.wmu.Lock()
|
|
|
|
|
+ cc.fr.WriteWindowUpdate(0, uint32(len(data)))
|
|
|
|
|
+ cc.bw.Flush()
|
|
|
|
|
+ cc.wmu.Unlock()
|
|
|
|
|
+ }
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
- if data := f.Data(); len(data) > 0 {
|
|
|
|
|
|
|
+ if len(data) > 0 {
|
|
|
if cs.bufPipe.b == nil {
|
|
if cs.bufPipe.b == nil {
|
|
|
// Data frame after it's already closed?
|
|
// Data frame after it's already closed?
|
|
|
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
|
|
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
|
|
@@ -1730,8 +1756,10 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
|
|
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
|
|
|
- // TODO: do something with err? send it as a debug frame to the peer?
|
|
|
|
|
- // But that's only in GOAWAY. Invent a new frame type? Is there one already?
|
|
|
|
|
|
|
+ // TODO: map err to more interesting error codes, once the
|
|
|
|
|
+ // HTTP community comes up with some. But currently for
|
|
|
|
|
+ // RST_STREAM there's no equivalent to GOAWAY frame's debug
|
|
|
|
|
+ // data, and the error codes are all pretty vague ("cancel").
|
|
|
cc.wmu.Lock()
|
|
cc.wmu.Lock()
|
|
|
cc.fr.WriteRSTStream(streamID, code)
|
|
cc.fr.WriteRSTStream(streamID, code)
|
|
|
cc.bw.Flush()
|
|
cc.bw.Flush()
|