Переглянути джерело

Merge pull request #510 from balboah/connection-pool-deadlock

Connection pool deadlock / invalid discovery rpc address
Chris Bannister 10 роки тому
батько
коміт
5ea083a90b
3 змінених файлів з 7 додано та 9 видалено
  1. 3 4
      connectionpool.go
  2. 2 2
      host_source.go
  3. 2 3
      policies.go

+ 3 - 4
connectionpool.go

@@ -121,6 +121,7 @@ func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
 
 func (p *policyConnPool) SetHosts(hosts []HostInfo) {
 	p.mu.Lock()
+	defer p.mu.Unlock()
 
 	toRemove := make(map[string]struct{})
 	for addr := range p.hostConnPools {
@@ -159,7 +160,6 @@ func (p *policyConnPool) SetHosts(hosts []HostInfo) {
 	// update the policy
 	p.hostPolicy.SetHosts(hosts)
 
-	p.mu.Unlock()
 }
 
 func (p *policyConnPool) SetPartitioner(partitioner string) {
@@ -186,6 +186,7 @@ func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
 	)
 
 	p.mu.RLock()
+	defer p.mu.RUnlock()
 	for conn == nil {
 		host = nextHost()
 		if host == nil {
@@ -201,13 +202,12 @@ func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
 
 		conn = pool.Pick(qry)
 	}
-
-	p.mu.RUnlock()
 	return host, conn
 }
 
 func (p *policyConnPool) Close() {
 	p.mu.Lock()
+	defer p.mu.Unlock()
 
 	// remove the hosts from the policy
 	p.hostPolicy.SetHosts([]HostInfo{})
@@ -217,7 +217,6 @@ func (p *policyConnPool) Close() {
 		delete(p.hostConnPools, addr)
 		pool.Close()
 	}
-	p.mu.Unlock()
 }
 
 // hostConnPool is a connection pool for a single host.

+ 2 - 2
host_source.go

@@ -35,7 +35,7 @@ type ringDescriber struct {
 }
 
 func checkSystemLocal(control *controlConn) (bool, error) {
-	iter := control.query("SELECT rpc_address FROM system.local")
+	iter := control.query("SELECT broadcast_address FROM system.local")
 	if err := iter.err; err != nil {
 		if errf, ok := err.(*errorFrame); ok {
 			if errf.code == errSyntax {
@@ -58,7 +58,7 @@ func (r *ringDescriber) GetHosts() (hosts []HostInfo, partitioner string, err er
 	const (
 		legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
 		// only supported in 2.2.0, 2.1.6, 2.0.16
-		localQuery = "SELECT rpc_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
+		localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
 	)
 
 	var localHost HostInfo

+ 2 - 3
policies.go

@@ -94,11 +94,11 @@ func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {
 func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 	// i is used to limit the number of attempts to find a host
 	// to the number of hosts known to this policy
-	var i uint32 = 0
+	var i uint32
 	return func() SelectedHost {
 		r.mu.RLock()
+		defer r.mu.RUnlock()
 		if len(r.hosts) == 0 {
-			r.mu.RUnlock()
 			return nil
 		}
 
@@ -110,7 +110,6 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 			host = &r.hosts[(pos)%uint32(len(r.hosts))]
 			i++
 		}
-		r.mu.RUnlock()
 		return selectedRoundRobinHost{host}
 	}
 }