Bläddra i källkod

Added code to debounce requests to fillPool when calls are in a tight
loop. Cleaned up code that was commented out in the first commit.
Adjusted error message in TestInvalidKeypsace test case.

Phillip Couto 11 år sedan
förälder
incheckning
1bce19d061
3 ändrade filer med 26 tillägg och 26 borttagningar
  1. 1 1
      cassandra_test.go
  2. 23 16
      cluster.go
  3. 2 9
      conn_test.go

+ 1 - 1
cassandra_test.go

@@ -96,7 +96,7 @@ func TestInvalidKeyspace(t *testing.T) {
 		}
 	} else {
 		session.Close() //Clean up the session
-		t.Error("Expected an error but CreateSession returned none.")
+		t.Error("expected err, got nil.")
 	}
 }
 

+ 23 - 16
cluster.go

@@ -58,12 +58,13 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 	}
 
 	impl := &clusterImpl{
-		cfg:      *cfg,
-		hostPool: NewRoundRobin(),
-		connPool: make(map[string]*RoundRobin),
-		conns:    make(map[*Conn]struct{}),
-		quitWait: make(chan bool),
-		keyspace: cfg.Keyspace,
+		cfg:          *cfg,
+		hostPool:     NewRoundRobin(),
+		connPool:     make(map[string]*RoundRobin),
+		conns:        make(map[*Conn]struct{}),
+		quitWait:     make(chan bool),
+		cFillingPool: make(chan int, 1),
+		keyspace:     cfg.Keyspace,
 	}
 	//Walk through connecting to hosts. As soon as one host connects
 	//defer the remaining connections to cluster.fillPool()
@@ -74,6 +75,7 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 		}
 		err := impl.connect(addr)
 		if err == nil {
+			impl.cFillingPool <- 1
 			go impl.fillPool()
 			break
 		}
@@ -94,13 +96,14 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
 }
 
 type clusterImpl struct {
-	cfg        ClusterConfig
-	hostPool   *RoundRobin
-	connPool   map[string]*RoundRobin
-	conns      map[*Conn]struct{}
-	keyspace   string
-	mu         sync.Mutex
-	muFillPool sync.Mutex
+	cfg      ClusterConfig
+	hostPool *RoundRobin
+	connPool map[string]*RoundRobin
+	conns    map[*Conn]struct{}
+	keyspace string
+	mu       sync.Mutex
+
+	cFillingPool chan int
 
 	quit     bool
 	quitWait chan bool
@@ -118,7 +121,6 @@ func (c *clusterImpl) connect(addr string) error {
 		Keepalive:     c.cfg.SocketKeepalive,
 	}
 
-	//delay := c.cfg.DelayMin
 	for {
 		conn, err := Connect(addr, cfg, c)
 		if err != nil {
@@ -159,8 +161,13 @@ func (c *clusterImpl) addConn(conn *Conn) error {
 //amount of connections defined. Also the method will test a host with one connection
 //instead of flooding the host with number of connections defined in the cluster config
 func (c *clusterImpl) fillPool() {
-	c.muFillPool.Lock()
-	defer c.muFillPool.Unlock()
+	//Debounce large amounts of requests to fill pool
+	select {
+	case <-time.After(1 * time.Millisecond):
+		return
+	case <-c.cFillingPool:
+		defer func() { c.cFillingPool <- 1 }()
+	}
 
 	c.mu.Lock()
 	isClosed := c.quit

+ 2 - 9
conn_test.go

@@ -166,6 +166,7 @@ func TestConnClosing(t *testing.T) {
 	if err != nil {
 		t.Errorf("NewCluster: %v", err)
 	}
+	defer db.Close()
 
 	numConns := db.cfg.NumConns
 	count := db.cfg.NumStreams * numConns
@@ -181,17 +182,9 @@ func TestConnClosing(t *testing.T) {
 
 	wg.Wait()
 
+	time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
 	cluster := db.Node.(*clusterImpl)
-	//Commented out as not sure the reason for closing the connections
-	//after they have been killed via queries.
-	/*cluster.mu.Lock()
-	for conn := range cluster.conns {
-		conn.conn.Close()
-	}
-
-	cluster.mu.Unlock()*/
 
-	time.Sleep(20 * time.Millisecond)
 	cluster.mu.Lock()
 	conns := len(cluster.conns)
 	cluster.mu.Unlock()