Explorar o código

control: control connection now detects down hosts

The control connection now tries to connect to the host it disconnected
from to check if it is down, if it is down it informs the session.
Chris Bannister %!s(int64=10) %!d(string=hai) anos
pai
achega
04735e550d
Modificáronse 1 ficheiros con 39 adicións e 16 borrados
  1. 39 16
      control.go

+ 39 - 16
control.go

@@ -6,6 +6,7 @@ import (
 	"log"
 	"log"
 	"math/rand"
 	"math/rand"
 	"net"
 	"net"
+	"strconv"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
@@ -118,6 +119,7 @@ func (c *controlConn) connect(endpoints []string) error {
 
 
 	c.conn.Store(conn)
 	c.conn.Store(conn)
 	atomic.StoreInt64(&c.connecting, 0)
 	atomic.StoreInt64(&c.connecting, 0)
+
 	c.closeWg.Add(1)
 	c.closeWg.Add(1)
 	go c.heartBeat()
 	go c.heartBeat()
 
 
@@ -143,6 +145,8 @@ func (c *controlConn) registerEvents(conn *Conn) error {
 }
 }
 
 
 func (c *controlConn) reconnect(refreshring bool) {
 func (c *controlConn) reconnect(refreshring bool) {
+	// TODO: simplify this function, use session.ring to get hosts instead of the
+	// connection pool
 	if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
 	if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
 		return
 		return
 	}
 	}
@@ -160,38 +164,57 @@ func (c *controlConn) reconnect(refreshring bool) {
 		}
 		}
 	}()
 	}()
 
 
+	addr := c.addr()
 	oldConn := c.conn.Load().(*Conn)
 	oldConn := c.conn.Load().(*Conn)
+	if oldConn != nil {
+		oldConn.Close()
+	}
+
+	var newConn *Conn
+	if addr != "" {
+		// try to connect to the old host
+		conn, err := c.session.connect(addr, c)
+		if err != nil {
+			// host is dead
+			// TODO: this is replicated in a few places
+			ip, portStr, _ := net.SplitHostPort(addr)
+			port, _ := strconv.Atoi(portStr)
+			c.session.handleNodeDown(net.ParseIP(ip), port)
+		} else {
+			newConn = conn
+		}
+	}
 
 
 	// TODO: should have our own roundrobbin for hosts so that we can try each
 	// TODO: should have our own roundrobbin for hosts so that we can try each
 	// in succession and guantee that we get a different host each time.
 	// in succession and guantee that we get a different host each time.
-	host, conn := c.session.pool.Pick(nil)
-	if conn == nil {
-		return
-	}
+	if newConn == nil {
+		_, conn := c.session.pool.Pick(nil)
+		if conn == nil {
+			return
+		}
 
 
-	newConn, err := Connect(conn.addr, c.connCfg, c, c.session)
-	if err != nil {
-		host.Mark(err)
-		// TODO: add log handler for things like this
-		return
+		if conn == nil {
+			return
+		}
+
+		var err error
+		newConn, err = c.session.connect(conn.addr, c)
+		if err != nil {
+			// TODO: add log handler for things like this
+			return
+		}
 	}
 	}
 
 
 	if err := c.registerEvents(newConn); err != nil {
 	if err := c.registerEvents(newConn); err != nil {
-		host.Mark(err)
 		// TODO: handle this case better
 		// TODO: handle this case better
 		newConn.Close()
 		newConn.Close()
-		log.Printf("gocql: unable to register events: %v\n", err)
+		log.Printf("gocql: control unable to register events: %v\n", err)
 		return
 		return
 	}
 	}
 
 
 	c.conn.Store(newConn)
 	c.conn.Store(newConn)
-	host.Mark(nil)
 	success = true
 	success = true
 
 
-	if oldConn != nil {
-		oldConn.Close()
-	}
-
 	if refreshring {
 	if refreshring {
 		c.session.hostSource.refreshRing()
 		c.session.hostSource.refreshRing()
 	}
 	}