|
@@ -345,20 +345,24 @@ func (c *conn) Flush() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *conn) Receive() (reply interface{}, err error) {
|
|
func (c *conn) Receive() (reply interface{}, err error) {
|
|
|
- c.mu.Lock()
|
|
|
|
|
- // There can be more receives than sends when using pub/sub. To allow
|
|
|
|
|
- // normal use of the connection after unsubscribe from all channels, do not
|
|
|
|
|
- // decrement pending to a negative value.
|
|
|
|
|
- if c.pending > 0 {
|
|
|
|
|
- c.pending -= 1
|
|
|
|
|
- }
|
|
|
|
|
- c.mu.Unlock()
|
|
|
|
|
if c.readTimeout != 0 {
|
|
if c.readTimeout != 0 {
|
|
|
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
|
|
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
|
|
|
}
|
|
}
|
|
|
if reply, err = c.readReply(); err != nil {
|
|
if reply, err = c.readReply(); err != nil {
|
|
|
return nil, c.fatal(err)
|
|
return nil, c.fatal(err)
|
|
|
}
|
|
}
|
|
|
|
|
+ // When using pub/sub, the number of receives can be greater than the
|
|
|
|
|
+ // number of sends. To enable normal use of the connection after
|
|
|
|
|
+ // unsubscribing from all channels, we do not decrement pending to a
|
|
|
|
|
+ // negative value.
|
|
|
|
|
+ //
|
|
|
|
|
+ // The pending field is decremented after the reply is read to handle the
|
|
|
|
|
+ // case where Receive is called before Send.
|
|
|
|
|
+ c.mu.Lock()
|
|
|
|
|
+ if c.pending > 0 {
|
|
|
|
|
+ c.pending -= 1
|
|
|
|
|
+ }
|
|
|
|
|
+ c.mu.Unlock()
|
|
|
if err, ok := reply.(Error); ok {
|
|
if err, ok := reply.(Error); ok {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|