Browse Source

Fix possible infinite wait on prepare error.

If multiple goroutines try to prepare the same statement at the
same time and the goroutine which prepares the statement errors
it will never release the other goroutines from the waitgroup
wait.

Fix this by storing the error and info in a struct and always
call WaitGroup.Done()
Chris Bannister 12 years ago
parent
commit
34a8a74534
1 changed files with 32 additions and 22 deletions
  1. 32 22
      conn.go

+ 32 - 22
conn.go

@@ -48,7 +48,7 @@ type Conn struct {
 	nwait int32
 	nwait int32
 
 
 	prepMu sync.Mutex
 	prepMu sync.Mutex
-	prep   map[string]*queryInfo
+	prep   map[string]*inflightPrepare
 
 
 	cluster    Cluster
 	cluster    Cluster
 	compressor Compressor
 	compressor Compressor
@@ -73,7 +73,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 		conn:       conn,
 		conn:       conn,
 		uniq:       make(chan uint8, cfg.NumStreams),
 		uniq:       make(chan uint8, cfg.NumStreams),
 		calls:      make([]callReq, cfg.NumStreams),
 		calls:      make([]callReq, cfg.NumStreams),
-		prep:       make(map[string]*queryInfo),
+		prep:       make(map[string]*inflightPrepare),
 		timeout:    cfg.Timeout,
 		timeout:    cfg.Timeout,
 		version:    uint8(cfg.ProtoVersion),
 		version:    uint8(cfg.ProtoVersion),
 		addr:       conn.RemoteAddr().String(),
 		addr:       conn.RemoteAddr().String(),
@@ -249,32 +249,37 @@ func (c *Conn) ping() error {
 
 
 func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
 func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
 	c.prepMu.Lock()
 	c.prepMu.Lock()
-	info := c.prep[stmt]
-	if info != nil {
+	flight := c.prep[stmt]
+	if flight != nil {
 		c.prepMu.Unlock()
 		c.prepMu.Unlock()
-		info.wg.Wait()
-		return info, nil
+		flight.wg.Wait()
+		return flight.info, flight.err
 	}
 	}
-	info = new(queryInfo)
-	info.wg.Add(1)
-	c.prep[stmt] = info
+
+	flight = new(inflightPrepare)
+	flight.wg.Add(1)
+	c.prep[stmt] = flight
 	c.prepMu.Unlock()
 	c.prepMu.Unlock()
 
 
 	resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
 	resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
 	if err != nil {
 	if err != nil {
-		return nil, err
-	}
-	switch x := resp.(type) {
-	case resultPreparedFrame:
-		info.id = x.PreparedId
-		info.args = x.Values
-		info.wg.Done()
-	case error:
-		return nil, x
-	default:
-		return nil, ErrProtocol
+		flight.err = err
+	} else {
+		switch x := resp.(type) {
+		case resultPreparedFrame:
+			flight.info = &queryInfo{
+				id:   x.PreparedId,
+				args: x.Values,
+			}
+		case error:
+			flight.err = x
+		default:
+			flight.err = ErrProtocol
+		}
 	}
 	}
-	return info, nil
+
+	flight.wg.Done()
+	return flight.info, flight.err
 }
 }
 
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
 func (c *Conn) executeQuery(qry *Query) *Iter {
@@ -493,7 +498,6 @@ type queryInfo struct {
 	id   []byte
 	id   []byte
 	args []ColumnInfo
 	args []ColumnInfo
 	rval []ColumnInfo
 	rval []ColumnInfo
-	wg   sync.WaitGroup
 }
 }
 
 
 type callReq struct {
 type callReq struct {
@@ -512,6 +516,12 @@ type Compressor interface {
 	Decode(data []byte) ([]byte, error)
 	Decode(data []byte) ([]byte, error)
 }
 }
 
 
+type inflightPrepare struct {
+	info *queryInfo
+	err  error
+	wg   sync.WaitGroup
+}
+
 // SnappyCompressor implements the Compressor interface and can be used to
 // SnappyCompressor implements the Compressor interface and can be used to
 // compress incoming and outgoing frames. The snappy compression algorithm
 // compress incoming and outgoing frames. The snappy compression algorithm
 // aims for very high speeds and reasonable compression.
 // aims for very high speeds and reasonable compression.