|
|
@@ -1376,17 +1376,12 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
|
|
|
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
|
|
|
type clientConnReadLoop struct {
|
|
|
cc *ClientConn
|
|
|
- activeRes map[uint32]*clientStream // keyed by streamID
|
|
|
closeWhenIdle bool
|
|
|
}
|
|
|
|
|
|
// readLoop runs in its own goroutine and reads and dispatches frames.
|
|
|
func (cc *ClientConn) readLoop() {
|
|
|
- rl := &clientConnReadLoop{
|
|
|
- cc: cc,
|
|
|
- activeRes: make(map[uint32]*clientStream),
|
|
|
- }
|
|
|
-
|
|
|
+ rl := &clientConnReadLoop{cc: cc}
|
|
|
defer rl.cleanup()
|
|
|
cc.readerErr = rl.run()
|
|
|
if ce, ok := cc.readerErr.(ConnectionError); ok {
|
|
|
@@ -1441,10 +1436,8 @@ func (rl *clientConnReadLoop) cleanup() {
|
|
|
} else if err == io.EOF {
|
|
|
err = io.ErrUnexpectedEOF
|
|
|
}
|
|
|
- for _, cs := range rl.activeRes {
|
|
|
- cs.bufPipe.CloseWithError(err)
|
|
|
- }
|
|
|
for _, cs := range cc.streams {
|
|
|
+ cs.bufPipe.CloseWithError(err) // no-op if already closed
|
|
|
select {
|
|
|
case cs.resc <- resAndError{err: err}:
|
|
|
default:
|
|
|
@@ -1522,7 +1515,7 @@ func (rl *clientConnReadLoop) run() error {
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
|
- if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
|
|
|
+ if rl.closeWhenIdle && gotReply && maybeIdle {
|
|
|
cc.closeIfIdle()
|
|
|
}
|
|
|
}
|
|
|
@@ -1578,9 +1571,6 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
|
|
|
// (nil, nil) special case. See handleResponse docs.
|
|
|
return nil
|
|
|
}
|
|
|
- if res.Body != noBody {
|
|
|
- rl.activeRes[cs.ID] = cs
|
|
|
- }
|
|
|
cs.resTrailer = &res.Trailer
|
|
|
cs.resc <- resAndError{res: res}
|
|
|
return nil
|
|
|
@@ -1919,7 +1909,6 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
|
|
|
rl.closeWhenIdle = true
|
|
|
}
|
|
|
cs.bufPipe.closeWithErrorAndCode(err, code)
|
|
|
- delete(rl.activeRes, cs.ID)
|
|
|
|
|
|
select {
|
|
|
case cs.resc <- resAndError{err: err}:
|
|
|
@@ -2046,7 +2035,6 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
|
|
|
cs.bufPipe.CloseWithError(err)
|
|
|
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
|
|
|
}
|
|
|
- delete(rl.activeRes, cs.ID)
|
|
|
return nil
|
|
|
}
|
|
|
|