فهرست منبع

Changed the reconnection logic to reduce the flood of reconnects. Also fixed incorrect keyspace on CreateSession.

Phillip Couto 11 سال پیش
والد
کامیت
b46e7c51e1
3فایلهای تغییر یافته به همراه120 افزوده شده و 61 حذف شده
  1. 17 1
      cassandra_test.go
  2. 97 57
      cluster.go
  3. 6 3
      conn_test.go

+ 17 - 1
cassandra_test.go

@@ -83,6 +83,23 @@ func TestUseStatementError(t *testing.T) {
 	}
 }
 
+//TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections
+func TestInvalidKeyspace(t *testing.T) {
+	cluster := NewCluster(strings.Split(*flagCluster, ",")...)
+	cluster.ProtoVersion = *flagProto
+	cluster.CQLVersion = *flagCQL
+	cluster.Keyspace = "invalidKeyspace"
+	session, err := cluster.CreateSession()
+	if err != nil {
+		if err != ErrNoConnections {
+			t.Errorf("Expected ErrNoConnections but got %v", err)
+		}
+	} else {
+		session.Close() //Clean up the session
+		t.Error("Expected an error but CreateSession returned none.")
+	}
+}
+
 func TestCRUD(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()
@@ -300,7 +317,6 @@ func TestCreateSessionTimeout(t *testing.T) {
 		t.Fatal("no startup timeout")
 	}()
 	c := NewCluster("127.0.0.1:1")
-	c.StartupTimeout = 1 * time.Second
 	_, err := c.CreateSession()
 
 	if err == nil {

+ 97 - 57
cluster.go

@@ -26,10 +26,6 @@ type ClusterConfig struct {
 	Keyspace        string        // initial keyspace (optional)
 	NumConns        int           // number of connections per host (default: 2)
 	NumStreams      int           // number of streams per connection (default: 128)
-	DelayMin        time.Duration // minimum reconnection delay (default: 1s)
-	DelayMax        time.Duration // maximum reconnection delay (default: 10min)
-	StartupMin      int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
-	StartupTimeout  time.Duration // amount of to wait for a connection (default: 5s)
 	Consistency     Consistency   // default consistency level (default: Quorum)
 	Compressor      Compressor    // compression algorithm (default: nil)
 	Authenticator   Authenticator // authenticator (default: nil)
@@ -40,18 +36,14 @@ type ClusterConfig struct {
 // NewCluster generates a new config for the default cluster implementation.
 func NewCluster(hosts ...string) *ClusterConfig {
 	cfg := &ClusterConfig{
-		Hosts:          hosts,
-		CQLVersion:     "3.0.0",
-		ProtoVersion:   2,
-		Timeout:        600 * time.Millisecond,
-		DefaultPort:    9042,
-		NumConns:       2,
-		NumStreams:     128,
-		DelayMin:       1 * time.Second,
-		DelayMax:       10 * time.Minute,
-		StartupMin:     len(hosts)/2 + 1,
-		StartupTimeout: 5 * time.Second,
-		Consistency:    Quorum,
+		Hosts:        hosts,
+		CQLVersion:   "3.0.0",
+		ProtoVersion: 2,
+		Timeout:      600 * time.Millisecond,
+		DefaultPort:  9042,
+		NumConns:     2,
+		NumStreams:   128,
+		Consistency:  Quorum,
 	}
 	return cfg
 }
@@ -71,48 +63,51 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 		connPool: make(map[string]*RoundRobin),
 		conns:    make(map[*Conn]struct{}),
 		quitWait: make(chan bool),
-		cStart:   make(chan int, 1),
 		keyspace: cfg.Keyspace,
 	}
+	//Walk through connecting to hosts. As soon as one host connects
+	//defer the remaining connections to cluster.fillPool()
 	for i := 0; i < len(impl.cfg.Hosts); i++ {
 		addr := strings.TrimSpace(impl.cfg.Hosts[i])
 		if strings.Index(addr, ":") < 0 {
 			addr = fmt.Sprintf("%s:%d", addr, impl.cfg.DefaultPort)
 		}
-		for j := 0; j < impl.cfg.NumConns; j++ {
-			go impl.connect(addr)
+		err := impl.connect(addr)
+		if err == nil {
+			go impl.fillPool()
+			break
 		}
+
 	}
-	//See if a connection is made within the StartupTimeout window.
-	select {
-	case <-impl.cStart:
+	//See if there are any connections in the pool
+	impl.mu.Lock()
+	conns := len(impl.conns)
+	impl.mu.Unlock()
+	if conns > 0 {
 		s := NewSession(impl)
 		s.SetConsistency(cfg.Consistency)
 		return s, nil
-	case <-time.After(cfg.StartupTimeout):
-		impl.Close()
-		return nil, ErrNoConnections
 	}
+	impl.Close()
+	return nil, ErrNoConnections
 
 }
 
 type clusterImpl struct {
-	cfg      ClusterConfig
-	hostPool *RoundRobin
-	connPool map[string]*RoundRobin
-	conns    map[*Conn]struct{}
-	keyspace string
-	mu       sync.Mutex
-
-	started bool
-	cStart  chan int
+	cfg        ClusterConfig
+	hostPool   *RoundRobin
+	connPool   map[string]*RoundRobin
+	conns      map[*Conn]struct{}
+	keyspace   string
+	mu         sync.Mutex
+	muFillPool sync.Mutex
 
 	quit     bool
 	quitWait chan bool
 	quitOnce sync.Once
 }
 
-func (c *clusterImpl) connect(addr string) {
+func (c *clusterImpl) connect(addr string) error {
 	cfg := ConnConfig{
 		ProtoVersion:  c.cfg.ProtoVersion,
 		CQLVersion:    c.cfg.CQLVersion,
@@ -123,40 +118,30 @@ func (c *clusterImpl) connect(addr string) {
 		Keepalive:     c.cfg.SocketKeepalive,
 	}
 
-	delay := c.cfg.DelayMin
+	//delay := c.cfg.DelayMin
 	for {
 		conn, err := Connect(addr, cfg, c)
 		if err != nil {
 			log.Printf("failed to connect to %q: %v", addr, err)
-			select {
-			case <-time.After(delay):
-				if delay *= 2; delay > c.cfg.DelayMax {
-					delay = c.cfg.DelayMax
-				}
-				continue
-			case <-c.quitWait:
-				return
-			}
+			return err
 		}
-		c.addConn(conn)
-		return
+		return c.addConn(conn)
 	}
 }
 
-func (c *clusterImpl) addConn(conn *Conn) {
+func (c *clusterImpl) addConn(conn *Conn) error {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	if c.quit {
 		conn.Close()
-		return
+		return nil
 	}
 	//Set the connection's keyspace if any before adding it to the pool
 	if c.keyspace != "" {
 		if err := conn.UseKeyspace(c.keyspace); err != nil {
 			log.Printf("error setting connection keyspace. %v", err)
 			conn.Close()
-			go c.connect(conn.Address())
-			return
+			return err
 		}
 	}
 	connPool := c.connPool[conn.Address()]
@@ -164,13 +149,58 @@ func (c *clusterImpl) addConn(conn *Conn) {
 		connPool = NewRoundRobin()
 		c.connPool[conn.Address()] = connPool
 		c.hostPool.AddNode(connPool)
-		if !c.started && c.hostPool.Size() >= c.cfg.StartupMin {
-			c.started = true
-			c.cStart <- 1
-		}
 	}
 	connPool.AddNode(conn)
 	c.conns[conn] = struct{}{}
+	return nil
+}
+
+//fillPool manages the pool of connections making sure that each host has the correct
+//amount of connections defined. Also the method will test a host with one connection
+//instead of flooding the host with number of connections defined in the cluster config
+func (c *clusterImpl) fillPool() {
+	c.muFillPool.Lock()
+	defer c.muFillPool.Unlock()
+
+	c.mu.Lock()
+	isClosed := c.quit
+	c.mu.Unlock()
+	//Exit if cluster(session) is closed
+	if isClosed {
+		return
+	}
+	//Walk through list of defined hosts
+	for i := 0; i < len(c.cfg.Hosts); i++ {
+		addr := strings.TrimSpace(c.cfg.Hosts[i])
+		if strings.Index(addr, ":") < 0 {
+			addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
+		}
+		var numConns int = 1
+		//See if the host already has connections in the pool
+		c.mu.Lock()
+		conns, ok := c.connPool[addr]
+		c.mu.Unlock()
+
+		if ok {
+			//if the host has enough connections just exit
+			numConns = conns.Size()
+			if numConns >= c.cfg.NumConns {
+				continue
+			}
+		} else {
+			//See if the host is reachable
+			if err := c.connect(addr); err != nil {
+				continue
+			}
+		}
+		//This is reached if the host is responsive and needs more connections
+		//Create connections for host synchronously to mitigate flooding the host.
+		go func(a string, conns int) {
+			for ; conns < c.cfg.NumConns; conns++ {
+				c.connect(addr)
+			}
+		}(addr, numConns)
+	}
 }
 
 // Should only be called if c.mu is locked
@@ -201,11 +231,21 @@ func (c *clusterImpl) HandleError(conn *Conn, err error, closed bool) {
 	}
 	c.removeConn(conn)
 	if !c.quit {
-		go c.connect(conn.Address()) // reconnect
+		go c.fillPool() // top off pool.
 	}
 }
 
 func (c *clusterImpl) Pick(qry *Query) *Conn {
+	//Check if connections are available
+	c.mu.Lock()
+	conns := len(c.conns)
+	c.mu.Unlock()
+
+	if conns == 0 {
+		//try to populate the pool before returning.
+		c.fillPool()
+	}
+
 	return c.hostPool.Pick(qry)
 }
 
@@ -223,5 +263,5 @@ func (c *clusterImpl) Close() {
 
 var (
 	ErrNoHosts       = errors.New("no hosts provided")
-	ErrNoConnections = errors.New("no connections were made in startup time frame")
+	ErrNoConnections = errors.New("no connections were made when creating the session")
 )

+ 6 - 3
conn_test.go

@@ -119,8 +119,9 @@ func TestRoundRobin(t *testing.T) {
 		defer servers[i].Stop()
 	}
 	cluster := NewCluster(addrs...)
-	cluster.StartupMin = len(addrs)
 	db, err := cluster.CreateSession()
+	time.Sleep(1 * time.Second) //Sleep to allow the Cluster.fillPool to complete
+
 	if err != nil {
 		t.Errorf("NewCluster: %v", err)
 	}
@@ -181,12 +182,14 @@ func TestConnClosing(t *testing.T) {
 	wg.Wait()
 
 	cluster := db.Node.(*clusterImpl)
-	cluster.mu.Lock()
+	//Commented out as not sure the reason for closing the connections
+	//after they have been killed via queries.
+	/*cluster.mu.Lock()
 	for conn := range cluster.conns {
 		conn.conn.Close()
 	}
 
-	cluster.mu.Unlock()
+	cluster.mu.Unlock()*/
 
 	time.Sleep(20 * time.Millisecond)
 	cluster.mu.Lock()