|
|
@@ -337,6 +337,7 @@ func (c *Conn) recv() error {
|
|
|
select {
|
|
|
case call.resp <- err:
|
|
|
default:
|
|
|
+ c.releaseStream(head.stream)
|
|
|
// in case the caller timedout
|
|
|
}
|
|
|
|
|
|
@@ -365,7 +366,6 @@ func (c *Conn) handleTimeout() {
|
|
|
func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
|
|
|
// TODO: move tracer onto conn
|
|
|
stream := <-c.uniq
|
|
|
- defer c.releaseStream(stream)
|
|
|
|
|
|
call := &c.calls[stream]
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
|
@@ -383,6 +383,11 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
|
|
|
|
|
|
err = <-call.resp
|
|
|
|
|
|
+ // dont release the stream if detect a timeout as another request can reuse
|
|
|
+ // that stream and get a response for the old request, which we have no
|
|
|
+ // easy way of detecting.
|
|
|
+ defer c.releaseStream(stream)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|