Browse Source

Merge branch 'issue-743-fix' of https://github.com/CrowdStrike/gocql into CrowdStrike-issue-743-fix

Chris Bannister 9 years ago
parent
commit
71387800e5
5 changed files with 19 additions and 20 deletions
  1. 1 0
      AUTHORS
  2. 1 1
      host_source.go
  3. 12 14
      policies.go
  4. 4 4
      policies_test.go
  5. 1 1
      query_executor.go

+ 1 - 0
AUTHORS

@@ -75,3 +75,4 @@ Caleb Doxsey <caleb@datadoghq.com>
 Frederic Hemery <frederic.hemery@datadoghq.com>
 Pekka Enberg <penberg@scylladb.com>
 Bartosz Burclaf <burclaf@gmail.com>
+Marcus King <marcusking01@gmail.com>

+ 1 - 1
host_source.go

@@ -234,7 +234,7 @@ func (h *HostInfo) update(from *HostInfo) {
 }
 
 func (h *HostInfo) IsUp() bool {
-	return h.State() == NodeUp
+	return h != nil && h.State() == NodeUp
 }
 
 func (h *HostInfo) String() string {

+ 12 - 14
policies.go

@@ -13,7 +13,7 @@ import (
 	"github.com/hailocab/go-hostpool"
 )
 
-// cowHostList implements a copy on write host list, its equivilent type is []*HostInfo
+// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
 type cowHostList struct {
 	list atomic.Value
 	mu   sync.Mutex
@@ -263,9 +263,6 @@ type tokenAwareHostPolicy struct {
 }
 
 func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
-	t.mu.Lock()
-	defer t.mu.Unlock()
-
 	if t.partitioner != partitioner {
 		t.fallback.SetPartitioner(partitioner)
 		t.partitioner = partitioner
@@ -278,18 +275,14 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
 	t.hosts.add(host)
 	t.fallback.AddHost(host)
 
-	t.mu.Lock()
 	t.resetTokenRing()
-	t.mu.Unlock()
 }
 
 func (t *tokenAwareHostPolicy) RemoveHost(addr string) {
 	t.hosts.remove(addr)
 	t.fallback.RemoveHost(addr)
 
-	t.mu.Lock()
 	t.resetTokenRing()
-	t.mu.Unlock()
 }
 
 func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) {
@@ -301,6 +294,9 @@ func (t *tokenAwareHostPolicy) HostDown(addr string) {
 }
 
 func (t *tokenAwareHostPolicy) resetTokenRing() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+
 	if t.partitioner == "" {
 		// partitioner not yet set
 		return
@@ -377,7 +373,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
 //     // Create host selection policy using a simple host pool
 //     cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
 //
-//     // Create host selection policy using an epsilon greddy pool
+//     // Create host selection policy using an epsilon greedy pool
 //     cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
 //         hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
 //     )
@@ -411,18 +407,20 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
 	r.mu.Lock()
 	defer r.mu.Unlock()
 
-	if _, ok := r.hostMap[host.Peer()]; ok {
+	// If the host addr is present and isn't nil return
+	if h, ok := r.hostMap[host.Peer()]; ok && h != nil{
 		return
 	}
-
-	hosts := make([]string, 0, len(r.hostMap)+1)
+	// otherwise, add the host to the map
+	r.hostMap[host.Peer()] = host
+	// and construct a new peer list to give to the HostPool
+	hosts := make([]string, 0, len(r.hostMap))
 	for addr := range r.hostMap {
 		hosts = append(hosts, addr)
 	}
-	hosts = append(hosts, host.Peer())
 
 	r.hp.SetHosts(hosts)
-	r.hostMap[host.Peer()] = host
+
 }
 
 func (r *hostPoolHostPolicy) RemoveHost(addr string) {

+ 4 - 4
policies_test.go

@@ -111,14 +111,14 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 func TestHostPoolHostPolicy(t *testing.T) {
 	policy := HostPoolHostPolicy(hostpool.New(nil))
 
-	hosts := [...]*HostInfo{
+	hosts := []*HostInfo{
 		{hostId: "0", peer: "0"},
 		{hostId: "1", peer: "1"},
 	}
 
-	for _, host := range hosts {
-		policy.AddHost(host)
-	}
+	// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
+	// which will result in an unpredictable ordering
+	policy.(*hostPoolHostPolicy).SetHosts(hosts)
 
 	// the first host selected is actually at [1], but this is ok for RR
 	// interleaved iteration should always increment the host

+ 1 - 1
query_executor.go

@@ -24,7 +24,7 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
 	var iter *Iter
 	for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
 		host := hostResponse.Info()
-		if !host.IsUp() {
+		if host == nil || !host.IsUp() {
 			continue
 		}