فهرست منبع

Merge pull request #577 from lovoo/concurrent-connect

Concurrent connect to hosts
Chris Bannister 10 سال پیش
والد
کامیت
fd8f3f0e79
2فایلهای تغییر یافته به همراه47 افزوده شده و 25 حذف شده
  1. 1 0
      AUTHORS
  2. 46 25
      connectionpool.go

+ 1 - 0
AUTHORS

@@ -62,3 +62,4 @@ Adriano Orioli <orioli.adriano@gmail.com>
 Claudiu Raveica <claudiu.raveica@gmail.com>
 Artem Chernyshev <artem.0xD2@gmail.com>
 Ference Fu <fym201@msn.com>
+LOVOO <opensource@lovoo.com>

+ 46 - 25
connectionpool.go

@@ -128,13 +128,23 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 		toRemove[addr] = struct{}{}
 	}
 
-	// TODO connect to hosts in parallel, but wait for pools to be
-	// created before returning
+	pools := make(chan *hostConnPool)
+	createCount := 0
 	for _, host := range hosts {
-		pool, exists := p.hostConnPools[host.Peer()]
-		if !exists && host.IsUp() {
+		if !host.IsUp() {
+			// don't create a connection pool for a down host
+			continue
+		}
+		if _, exists := p.hostConnPools[host.Peer()]; exists {
+			// still have this host, so don't remove it
+			delete(toRemove, host.Peer())
+			continue
+		}
+
+		createCount++
+		go func(host *HostInfo) {
 			// create a connection pool for the host
-			pool = newHostConnPool(
+			pools <- newHostConnPool(
 				p.session,
 				host,
 				p.port,
@@ -142,10 +152,16 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 				p.keyspace,
 				p.connPolicy(),
 			)
-			p.hostConnPools[host.Peer()] = pool
-		} else {
-			// still have this host, so don't remove it
-			delete(toRemove, host.Peer())
+		}(host)
+	}
+
+	// add created pools
+	for createCount > 0 {
+		pool := <-pools
+		createCount--
+		if pool.Size() > 0 {
+			// add pool onyl if there a connections available
+			p.hostConnPools[pool.host.Peer()] = pool
 		}
 	}
 
@@ -421,15 +437,8 @@ func (pool *hostConnPool) fill() {
 
 		// filled one
 		fillCount--
-
-		// connect all connections to this host in sync
-		for fillCount > 0 {
-			err := pool.connect()
-			pool.logConnectErr(err)
-
-			// decrement, even on error
-			fillCount--
-		}
+		// connect all remaining connections to this host
+		pool.connectMany(fillCount)
 
 		pool.fillingStopped()
 		return
@@ -437,13 +446,7 @@ func (pool *hostConnPool) fill() {
 
 	// fill the rest of the pool asynchronously
 	go func() {
-		for fillCount > 0 {
-			err := pool.connect()
-			pool.logConnectErr(err)
-
-			// decrement, even on error
-			fillCount--
-		}
+		pool.connectMany(fillCount)
 
 		// mark the end of filling
 		pool.fillingStopped()
@@ -472,6 +475,24 @@ func (pool *hostConnPool) fillingStopped() {
 	pool.mu.Unlock()
 }
 
+// connectMany creates new connections concurrent.
+func (pool *hostConnPool) connectMany(count int) {
+	if count == 0 {
+		return
+	}
+	var wg sync.WaitGroup
+	wg.Add(count)
+	for i := 0; i < count; i++ {
+		go func() {
+			defer wg.Done()
+			err := pool.connect()
+			pool.logConnectErr(err)
+		}()
+	}
+	// wait for all connections are done
+	wg.Wait()
+}
+
 // create a new connection to the host and add it to the pool
 func (pool *hostConnPool) connect() error {
 	// try to connect