Browse Source

policies: add IsLocal (#1041)

Add method for host selection policies to indicate if a node should be
considered local. Currently this is only used when using the DCAwareRR
policy with a TokenAware policy.
Chris Bannister 8 years ago
parent
commit
b3286acf00
1 changed files with 17 additions and 10 deletions
  1. 17 10
      policies.go

+ 17 - 10
policies.go

@@ -212,6 +212,7 @@ type HostSelectionPolicy interface {
 	SetPartitioner
 	KeyspaceChanged(KeyspaceUpdateEvent)
 	Init(*Session)
+	IsLocal(host *HostInfo) bool
 	//Pick returns an iteration function over selected hosts
 	Pick(ExecutableQuery) NextHost
 }
@@ -246,6 +247,7 @@ type roundRobinHostPolicy struct {
 	mu    sync.RWMutex
 }
 
+func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool              { return true }
 func (r *roundRobinHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
 func (r *roundRobinHostPolicy) SetPartitioner(partitioner string)   {}
 func (r *roundRobinHostPolicy) Init(*Session)                       {}
@@ -326,6 +328,10 @@ func (t *tokenAwareHostPolicy) Init(s *Session) {
 	t.session = s
 }
 
+func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
+	return t.fallback.IsLocal(host)
+}
+
 func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
 	meta, _ := t.keyspaces.Load().(*keyspaceMeta)
 	var size = 1
@@ -463,20 +469,16 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
 		i            int
 	)
 
-	used := make(map[*HostInfo]bool)
+	used := make(map[*HostInfo]bool, len(replicas))
 	return func() SelectedHost {
 		for i < len(replicas) {
 			h := replicas[i]
 			i++
 
-			if !h.IsUp() {
-				// TODO: need a way to handle host distance, as we may want to not
-				// use hosts in specific DC's
-				continue
+			if h.IsUp() && t.fallback.IsLocal(h) {
+				used[h] = true
+				return (*selectedHost)(h)
 			}
-			used[h] = true
-
-			return (*selectedHost)(h)
 		}
 
 		if fallbackIter == nil {
@@ -521,6 +523,7 @@ type hostPoolHostPolicy struct {
 func (r *hostPoolHostPolicy) Init(*Session)                       {}
 func (r *hostPoolHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
 func (r *hostPoolHostPolicy) SetPartitioner(string)               {}
+func (r *hostPoolHostPolicy) IsLocal(*HostInfo) bool              { return true }
 
 func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
 	peers := make([]string, len(hosts))
@@ -650,10 +653,14 @@ func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
 	return &dcAwareRR{local: localDC}
 }
 
-func (r *dcAwareRR) Init(*Session)                       {}
-func (r *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
+func (d *dcAwareRR) Init(*Session)                       {}
+func (d *dcAwareRR) KeyspaceChanged(KeyspaceUpdateEvent) {}
 func (d *dcAwareRR) SetPartitioner(p string)             {}
 
+func (d *dcAwareRR) IsLocal(host *HostInfo) bool {
+	return host.DataCenter() == d.local
+}
+
 func (d *dcAwareRR) AddHost(host *HostInfo) {
 	if host.DataCenter() == d.local {
 		d.localHosts.add(host)