Преглед изворни кода

Merge remote-tracking branch 'upstream/master' into closing_connections

Ben Hood пре 11 година
родитељ
комит
1665b39204
1 измењених фајлова са 11 додато и 6 уклоњено
  1. 11 6
      conn.go

+ 11 - 6
conn.go

@@ -82,6 +82,7 @@ type Conn struct {
 	auth       Authenticator
 	addr       string
 	version    uint8
+	isClosed   bool
 }
 
 // Connect establishes a connection to a Cassandra node.
@@ -109,6 +110,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 		addr:       conn.RemoteAddr().String(),
 		cluster:    cluster,
 		compressor: cfg.Compressor,
+		isClosed:   false,
 		auth:       cfg.Authenticator,
 	}
 
@@ -121,6 +123,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 	}
 
 	if err := c.startup(&cfg); err != nil {
+		conn.Close()
 		return nil, err
 	}
 
@@ -197,7 +200,7 @@ func (c *Conn) serve() {
 		c.dispatch(resp)
 	}
 
-	c.conn.Close()
+	c.Close()
 	for id := 0; id < len(c.calls); id++ {
 		req := &c.calls[id]
 		if atomic.LoadInt32(&req.active) == 1 {
@@ -248,8 +251,7 @@ func (c *Conn) execSimple(op operation) (interface{}, error) {
 	f, err := op.encodeFrame(c.version, nil)
 	f.setLength(len(f) - headerSize)
 	if _, err := c.conn.Write([]byte(f)); err != nil {
-		c.conn.Close()
-		c.cluster.HandleError(c, err, true)
+		c.Close()
 		return nil, err
 	}
 	if f, err = c.recv(); err != nil {
@@ -283,10 +285,9 @@ func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
 	atomic.AddInt32(&c.nwait, 1)
 	atomic.StoreInt32(&call.active, 1)
 
-	if _, err = c.conn.Write(req); err != nil {
-		c.conn.Close()
-		c.cluster.HandleError(c, err, true)
+	if _, err := c.conn.Write(req); err != nil {
 		c.uniq <- id
+		c.Close()
 		return nil, err
 	}
 
@@ -448,10 +449,14 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 }
 
 func (c *Conn) Pick(qry *Query) *Conn {
+	if c.isClosed || len(c.uniq) == 0 {
+		return nil
+	}
 	return c
 }
 
 func (c *Conn) Close() {
+	c.isClosed = true
 	c.conn.Close()
 }