浏览代码

Merge pull request #623 from Zariel/dont-block-pool-dialling

dont fill the host pool whilst holding the connection pools lock
Chris Bannister 9 年之前
父节点
当前提交
414bacaf47
共有 6 个文件被更改,包括 24 次插入25 次删除
  1. 17 18
      connectionpool.go
  2. 2 2
      control.go
  3. 1 1
      debug_off.go
  4. 1 1
      debug_on.go
  5. 2 2
      events.go
  6. 1 1
      integration.sh

+ 17 - 18
connectionpool.go

@@ -241,24 +241,22 @@ func (p *policyConnPool) Close() {
 
 func (p *policyConnPool) addHost(host *HostInfo) {
 	p.mu.Lock()
-	defer p.mu.Unlock()
-
 	pool, ok := p.hostConnPools[host.Peer()]
-	if ok {
-		go pool.fill()
-		return
+	if !ok {
+		pool = newHostConnPool(
+			p.session,
+			host,
+			host.Port(),
+			p.numConns,
+			p.keyspace,
+			p.connPolicy(),
+		)
+
+		p.hostConnPools[host.Peer()] = pool
 	}
+	p.mu.Unlock()
 
-	pool = newHostConnPool(
-		p.session,
-		host,
-		host.Port(),
-		p.numConns,
-		p.keyspace,
-		p.connPolicy(),
-	)
-
-	p.hostConnPools[host.Peer()] = pool
+	pool.fill()
 
 	// update policy
 	// TODO: policy should not have conns, it should have hosts and return a host
@@ -334,9 +332,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
 		closed:   false,
 	}
 
-	// fill the pool with the initial connections before returning
-	pool.fill()
-
+	// the pool is not filled or connected
 	return pool
 }
 
@@ -463,6 +459,9 @@ func (pool *hostConnPool) logConnectErr(err error) {
 	if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
 		// connection refused
 		// these are typical during a node outage so avoid log spam.
+		if gocqlDebug {
+			log.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
+		}
 	} else if err != nil {
 		// unexpected error
 		log.Printf("error: failed to connect to %s due to error: %v", pool.addr, err)

+ 2 - 2
control.go

@@ -148,7 +148,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
 		return err
 	}
 
-	go c.session.handleNodeUp(net.ParseIP(host), port, true)
+	c.session.handleNodeUp(net.ParseIP(host), port, false)
 
 	return nil
 }
@@ -299,7 +299,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 			return conn.executeQuery(q)
 		})
 
-		if debug && iter.err != nil {
+		if gocqlDebug && iter.err != nil {
 			log.Printf("control: error executing %q: %v\n", statement, iter.err)
 		}
 

+ 1 - 1
debug_off.go

@@ -2,4 +2,4 @@
 
 package gocql
 
-const debug = false
+const gocqlDebug = false

+ 1 - 1
debug_on.go

@@ -2,4 +2,4 @@
 
 package gocql
 
-const debug = true
+const gocqlDebug = true

+ 2 - 2
events.go

@@ -91,7 +91,7 @@ func (s *Session) handleEvent(framer *framer) {
 		return
 	}
 
-	if debug {
+	if gocqlDebug {
 		log.Printf("gocql: handling frame: %v\n", frame)
 	}
 
@@ -140,7 +140,7 @@ func (s *Session) handleNodeEvent(frames []frame) {
 	}
 
 	for _, f := range events {
-		if debug {
+		if gocqlDebug {
 			log.Printf("gocql: dispatching event: %+v\n", f)
 		}
 

+ 1 - 1
integration.sh

@@ -65,7 +65,7 @@ function run_tests() {
 		go test -v . -timeout 15s -run=TestAuthentication -tags "integration gocql_debug" -runssl -runauth -proto=$proto -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=1000ms
 	else
 
-		go test -timeout 10m -tags integration -v -gocql.timeout=10s -runssl -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=2000ms -compressor=snappy ./...
+		go test -tags "integration gocql_debug" -timeout 10m -v -gocql.timeout=10s -runssl -proto=$proto -rf=3 -cluster=$(ccm liveset) -clusterSize=$clusterSize -autowait=2000ms -compressor=snappy ./...
 
 		if [ ${PIPESTATUS[0]} -ne 0 ]; then
 			echo "--- FAIL: ccm status follows:"