瀏覽代碼

Merge pull request #610 from Zariel/events-queryable

control: handle no hosts in connection pool
Chris Bannister 10 年之前
父節點
當前提交
6ff76c9630
共有 3 個文件被更改,包括 130 次插入65 次删除
  1. 73 65
      control.go
  2. 53 0
      events_ccm_test.go
  3. 4 0
      session.go

+ 73 - 65
control.go

@@ -1,28 +1,39 @@
 package gocql
 
 import (
+	crand "crypto/rand"
 	"errors"
 	"fmt"
 	"log"
 	"math/rand"
 	"net"
 	"strconv"
-	"sync"
 	"sync/atomic"
 	"time"
 )
 
+var (
+	randr *rand.Rand
+)
+
+func init() {
+	b := make([]byte, 4)
+	if _, err := crand.Read(b); err != nil {
+		panic(fmt.Sprintf("unable to seed random number generator: %v", err))
+	}
+
+	randr = rand.New(rand.NewSource(int64(readInt(b))))
+}
+
 // Ensure that the atomic variable is aligned to a 64bit boundary
 // so that atomic operations can be applied on 32bit architectures.
 type controlConn struct {
-	connecting int64
-
 	session *Session
 	conn    atomic.Value
 
 	retry RetryPolicy
 
-	closeWg sync.WaitGroup
+	started int32
 	quit    chan struct{}
 }
 
@@ -39,13 +50,17 @@ func createControlConn(session *Session) *controlConn {
 }
 
 func (c *controlConn) heartBeat() {
-	defer c.closeWg.Done()
+	if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
+		return
+	}
+
+	sleepTime := 1 * time.Second
 
 	for {
 		select {
 		case <-c.quit:
 			return
-		case <-time.After(5 * time.Second):
+		case <-time.After(sleepTime):
 		}
 
 		resp, err := c.writeFrame(&writeOptionsFrame{})
@@ -55,6 +70,8 @@ func (c *controlConn) heartBeat() {
 
 		switch resp.(type) {
 		case *supportedFrame:
+			// Everything ok
+			sleepTime = 5 * time.Second
 			continue
 		case error:
 			goto reconn
@@ -63,65 +80,79 @@ func (c *controlConn) heartBeat() {
 		}
 
 	reconn:
+		// try to connect a bit faster
+		sleepTime = 1 * time.Second
 		c.reconnect(true)
 		// time.Sleep(5 * time.Second)
 		continue
 	}
 }
 
-func (c *controlConn) connect(endpoints []string) error {
-	// intial connection attmept, try to connect to each endpoint to get an initial
-	// list of nodes.
-
-	// shuffle endpoints so not all drivers will connect to the same initial
-	// node.
-	r := rand.New(rand.NewSource(time.Now().UnixNano()))
-	perm := r.Perm(len(endpoints))
+func (c *controlConn) shuffleDial(endpoints []string) (conn *Conn, err error) {
+	perm := randr.Perm(len(endpoints))
 	shuffled := make([]string, len(endpoints))
 
 	for i, endpoint := range endpoints {
 		shuffled[perm[i]] = endpoint
 	}
 
-	// store that we are not connected so that reconnect wont happen if we error
-	atomic.StoreInt64(&c.connecting, -1)
-
-	var (
-		conn *Conn
-		err  error
-	)
-
+	// shuffle endpoints so not all drivers will connect to the same initial
+	// node.
 	for _, addr := range shuffled {
 		conn, err = c.session.connect(JoinHostPort(addr, c.session.cfg.Port), c)
-		if err != nil {
-			log.Printf("gocql: unable to control conn dial %v: %v\n", addr, err)
-			continue
+		if err == nil {
+			return
 		}
 
-		if err = c.registerEvents(conn); err != nil {
-			conn.Close()
-			continue
-		}
+		log.Printf("gocql: unable to control conn dial %v: %v\n", addr, err)
+	}
 
-		// we should fetch the initial ring here and update initial host data. So that
-		// when we return from here we have a ring topology ready to go.
-		break
+	return
+}
+
+func (c *controlConn) connect(endpoints []string) error {
+	conn, err := c.shuffleDial(endpoints)
+	if err != nil {
+		return err
+	} else if conn == nil {
+		return errors.New("gocql: unable to connect to initial endpoints")
 	}
 
-	if conn == nil {
-		// this is fatal, not going to connect a session
+	if err := c.setupConn(conn); err != nil {
+		conn.Close()
 		return err
 	}
 
-	c.conn.Store(conn)
-	atomic.StoreInt64(&c.connecting, 0)
+	// we could fetch the initial ring here and update initial host data. So that
+	// when we return from here we have a ring topology ready to go.
 
-	c.closeWg.Add(1)
 	go c.heartBeat()
 
 	return nil
 }
 
+func (c *controlConn) setupConn(conn *Conn) error {
+	if err := c.registerEvents(conn); err != nil {
+		conn.Close()
+		return err
+	}
+
+	c.conn.Store(conn)
+
+	host, portstr, err := net.SplitHostPort(conn.conn.RemoteAddr().String())
+	if err != nil {
+		return err
+	}
+	port, err := strconv.Atoi(portstr)
+	if err != nil {
+		return err
+	}
+
+	go c.session.handleNodeUp(net.ParseIP(host), port, true)
+
+	return nil
+}
+
 func (c *controlConn) registerEvents(conn *Conn) error {
 	var events []string
 
@@ -159,22 +190,6 @@ func (c *controlConn) registerEvents(conn *Conn) error {
 func (c *controlConn) reconnect(refreshring bool) {
 	// TODO: simplify this function, use session.ring to get hosts instead of the
 	// connection pool
-	if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
-		return
-	}
-
-	success := false
-	defer func() {
-		// debounce reconnect a little
-		if success {
-			go func() {
-				time.Sleep(500 * time.Millisecond)
-				atomic.StoreInt64(&c.connecting, 0)
-			}()
-		} else {
-			atomic.StoreInt64(&c.connecting, 0)
-		}
-	}()
 
 	addr := c.addr()
 	oldConn := c.conn.Load().(*Conn)
@@ -202,10 +217,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 	if newConn == nil {
 		_, conn := c.session.pool.Pick(nil)
 		if conn == nil {
-			return
-		}
-
-		if conn == nil {
+			c.connect(c.session.ring.endpoints)
 			return
 		}
 
@@ -217,16 +229,12 @@ func (c *controlConn) reconnect(refreshring bool) {
 		}
 	}
 
-	if err := c.registerEvents(newConn); err != nil {
-		// TODO: handle this case better
+	if err := c.setupConn(newConn); err != nil {
 		newConn.Close()
 		log.Printf("gocql: control unable to register events: %v\n", err)
 		return
 	}
 
-	c.conn.Store(newConn)
-	success = true
-
 	if refreshring {
 		c.session.hostSource.refreshRing()
 	}
@@ -355,9 +363,9 @@ func (c *controlConn) addr() string {
 }
 
 func (c *controlConn) close() {
-	// TODO: handle more gracefully
-	close(c.quit)
-	c.closeWg.Wait()
+	if atomic.CompareAndSwapInt32(&c.started, 1, -1) {
+		c.quit <- struct{}{}
+	}
 	conn := c.conn.Load().(*Conn)
 	if conn != nil {
 		conn.Close()

+ 53 - 0
events_ccm_test.go

@@ -236,3 +236,56 @@ func TestEventFilter(t *testing.T) {
 	}
 
 }
+
+func TestEventDownQueryable(t *testing.T) {
+	if err := ccm.AllUp(); err != nil {
+		t.Fatal(err)
+	}
+
+	status, err := ccm.Status()
+	if err != nil {
+		t.Fatal(err)
+	}
+	log.Printf("status=%+v\n", status)
+
+	const targetNode = "node1"
+
+	addr := status[targetNode].Addr
+
+	cluster := createCluster()
+	cluster.Hosts = []string{addr}
+	cluster.HostFilter = WhiteListHostFilter(addr)
+	session := createSessionFromCluster(cluster, t)
+	defer session.Close()
+
+	if pool, ok := session.pool.getPool(addr); !ok {
+		t.Fatalf("should have %v in pool but dont", addr)
+	} else if !pool.host.IsUp() {
+		t.Fatalf("host is not up %v", pool.host)
+	}
+
+	if err := ccm.NodeDown(targetNode); err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(5 * time.Second)
+
+	if err := ccm.NodeUp(targetNode); err != nil {
+		t.Fatal(err)
+	}
+
+	time.Sleep(15 * time.Second)
+
+	if pool, ok := session.pool.getPool(addr); !ok {
+		t.Fatalf("should have %v in pool but dont", addr)
+	} else if !pool.host.IsUp() {
+		t.Fatalf("host is not up %v", pool.host)
+	}
+
+	var rows int
+	if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
+		t.Fatal(err)
+	} else if rows != 1 {
+		t.Fatalf("expected to get 1 row got %d", rows)
+	}
+}

+ 4 - 0
session.go

@@ -259,6 +259,10 @@ func (s *Session) Close() {
 	if s.nodeEvents != nil {
 		s.nodeEvents.stop()
 	}
+
+	if s.schemaEvents != nil {
+		s.schemaEvents.stop()
+	}
 }
 
 func (s *Session) Closed() bool {