浏览代码

Merge pull request #669 from retailnext/fix-synchronous-pool-filling

Fix 100ms sleep when creating session.
Chris Bannister 9 年之前
父节点
当前提交
f8d117a884
共有 2 个文件被更改,包括 40 次插入22 次删除
  1. 16 6
      cassandra_test.go
  2. 24 16
      connectionpool.go

+ 16 - 6
cassandra_test.go

@@ -88,9 +88,6 @@ func TestInvalidPeerEntry(t *testing.T) {
 		"169.254.235.45",
 	)
 
-	// clean up naughty peer
-	defer session.Query("DELETE from system.peers where peer == ?", "169.254.235.45").Exec()
-
 	if err := query.Exec(); err != nil {
 		t.Fatal(err)
 	}
@@ -100,7 +97,10 @@ func TestInvalidPeerEntry(t *testing.T) {
 	cluster := createCluster()
 	cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
 	session = createSessionFromCluster(cluster, t)
-	defer session.Close()
+	defer func() {
+		session.Query("DELETE from system.peers where peer = ?", "169.254.235.45").Exec()
+		session.Close()
+	}()
 
 	// check we can perform a query
 	iter := session.Query("select peer from system.peers").Iter()
@@ -1920,8 +1920,18 @@ func TestTokenAwareConnPool(t *testing.T) {
 	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if expected := cluster.NumConns * len(session.ring.allHosts()); session.pool.Size() != expected {
-		t.Errorf("Expected pool size %d but was %d", expected, session.pool.Size())
+	expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
+
+	// wait for pool to fill
+	for i := 0; i < 10; i++ {
+		if session.pool.Size() == expectedPoolSize {
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	if expectedPoolSize != session.pool.Size() {
+		t.Errorf("Expected pool size %d but was %d", expectedPoolSize, session.pool.Size())
 	}
 
 	// add another cf so there are two pages when fetching table metadata from our keyspace

+ 24 - 16
connectionpool.go

@@ -429,7 +429,7 @@ func (pool *hostConnPool) fill() {
 
 		if err != nil {
 			// probably unreachable host
-			pool.fillingStopped()
+			pool.fillingStopped(true)
 
 			// this is calle with the connetion pool mutex held, this call will
 			// then recursivly try to lock it again. FIXME
@@ -439,19 +439,14 @@ func (pool *hostConnPool) fill() {
 
 		// filled one
 		fillCount--
-		// connect all remaining connections to this host
-		pool.connectMany(fillCount)
-
-		pool.fillingStopped()
-		return
 	}
 
 	// fill the rest of the pool asynchronously
 	go func() {
-		pool.connectMany(fillCount)
+		err := pool.connectMany(fillCount)
 
 		// mark the end of filling
-		pool.fillingStopped()
+		pool.fillingStopped(err != nil)
 	}()
 }
 
@@ -469,11 +464,13 @@ func (pool *hostConnPool) logConnectErr(err error) {
 }
 
 // transition back to a not-filling state.
-func (pool *hostConnPool) fillingStopped() {
-	// wait for some time to avoid back-to-back filling
-	// this provides some time between failed attempts
-	// to fill the pool for the host to recover
-	time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
+func (pool *hostConnPool) fillingStopped(hadError bool) {
+	if hadError {
+		// wait for some time to avoid back-to-back filling
+		// this provides some time between failed attempts
+		// to fill the pool for the host to recover
+		time.Sleep(time.Duration(rand.Int31n(100)+31) * time.Millisecond)
+	}
 
 	pool.mu.Lock()
 	pool.filling = false
@@ -481,21 +478,32 @@ func (pool *hostConnPool) fillingStopped() {
 }
 
 // connectMany creates new connections concurrent.
-func (pool *hostConnPool) connectMany(count int) {
+func (pool *hostConnPool) connectMany(count int) error {
 	if count == 0 {
-		return
+		return nil
 	}
-	var wg sync.WaitGroup
+	var (
+		wg         sync.WaitGroup
+		mu         sync.Mutex
+		connectErr error
+	)
 	wg.Add(count)
 	for i := 0; i < count; i++ {
 		go func() {
 			defer wg.Done()
 			err := pool.connect()
 			pool.logConnectErr(err)
+			if err != nil {
+				mu.Lock()
+				connectErr = err
+				mu.Unlock()
+			}
 		}()
 	}
 	// wait for all connections are done
 	wg.Wait()
+
+	return connectErr
 }
 
 // create a new connection to the host and add it to the pool