瀏覽代碼

conn: ensure that callers unblock correctly

Make the response channel unbuffered so that there is an
explicit synchronisation point between exec() and recv(), this
prevents a case where a timeout could happend and recv() will still
send a response to the waiting response channel even though the caller
is not receving.

In closeWithError if the error is nil dont send it to waiting callers
because they will try to access the response frame, instead close
the quit channel which will indicate the connection has been closed
and should no longer wait for a response.
Chris Bannister 10 年之前
父節點
當前提交
ff272b077d
共有 1 個文件被更改,包括 15 次插入10 次删除
  1. 15 10
      conn.go

+ 15 - 10
conn.go

@@ -180,7 +180,7 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 	}
 
 	for i := 0; i < cfg.NumStreams; i++ {
-		c.calls[i].resp = make(chan error, 1)
+		c.calls[i].resp = make(chan error)
 		c.uniq <- i
 	}
 
@@ -622,20 +622,25 @@ func (c *Conn) closeWithError(err error) {
 		return
 	}
 
-	for id := 0; id < len(c.calls); id++ {
-		req := &c.calls[id]
-		// we need to send the error to all waiting queries, put the state
-		// of this conn into not active so that it can not execute any queries.
-		atomic.StoreInt32(&req.waiting, -1)
+	if err != nil {
+		// we should attempt to deliver the error back to the caller if it
+		// exists
+		for id := 0; id < len(c.calls); id++ {
+			req := &c.calls[id]
+			// we need to send the error to all waiting queries, put the state
+			// of this conn into not active so that it can not execute any queries.
+			atomic.StoreInt32(&req.waiting, -1)
 
-		if err != nil {
-			select {
-			case req.resp <- err:
-			default:
+			if err != nil {
+				select {
+				case req.resp <- err:
+				default:
+				}
 			}
 		}
 	}
 
+	// if error was nil then unblock the quit channel
 	close(c.quit)
 	c.conn.Close()