|
|
@@ -343,10 +343,12 @@ func (c *Conn) recv() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ // we either, return a response to the caller, the caller timedout, or the
|
|
|
+ // connection has closed. Either way we should never block indefinatly here
|
|
|
select {
|
|
|
case call.resp <- err:
|
|
|
+ case <-call.timeout:
|
|
|
case <-c.quit:
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -357,6 +359,7 @@ type callReq struct {
|
|
|
resp chan error
|
|
|
framer *framer
|
|
|
waiting int32
|
|
|
+ timeout chan struct{} // indicates to recv() that a call has timedout
|
|
|
}
|
|
|
|
|
|
func (c *Conn) releaseStream(stream int) {
|
|
|
@@ -384,6 +387,7 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
|
framer := newFramer(c, c, c.compressor, c.version)
|
|
|
call.framer = framer
|
|
|
+ call.timeout = make(chan struct{})
|
|
|
|
|
|
if tracer != nil {
|
|
|
framer.trace()
|
|
|
@@ -410,6 +414,7 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
case <-time.After(c.timeout):
|
|
|
+ close(call.timeout)
|
|
|
c.handleTimeout()
|
|
|
return nil, ErrTimeoutNoResponse
|
|
|
case <-c.quit:
|