|
|
@@ -13,7 +13,7 @@ import (
|
|
|
"github.com/hailocab/go-hostpool"
|
|
|
)
|
|
|
|
|
|
-// cowHostList implements a copy on write host list, its equivilent type is []*HostInfo
|
|
|
+// cowHostList implements a copy on write host list, its equivalent type is []*HostInfo
|
|
|
type cowHostList struct {
|
|
|
list atomic.Value
|
|
|
mu sync.Mutex
|
|
|
@@ -263,9 +263,6 @@ type tokenAwareHostPolicy struct {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
|
|
|
- t.mu.Lock()
|
|
|
- defer t.mu.Unlock()
|
|
|
-
|
|
|
if t.partitioner != partitioner {
|
|
|
t.fallback.SetPartitioner(partitioner)
|
|
|
t.partitioner = partitioner
|
|
|
@@ -278,18 +275,14 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
|
|
|
t.hosts.add(host)
|
|
|
t.fallback.AddHost(host)
|
|
|
|
|
|
- t.mu.Lock()
|
|
|
t.resetTokenRing()
|
|
|
- t.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) RemoveHost(addr string) {
|
|
|
t.hosts.remove(addr)
|
|
|
t.fallback.RemoveHost(addr)
|
|
|
|
|
|
- t.mu.Lock()
|
|
|
t.resetTokenRing()
|
|
|
- t.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) {
|
|
|
@@ -301,6 +294,9 @@ func (t *tokenAwareHostPolicy) HostDown(addr string) {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) resetTokenRing() {
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+
|
|
|
if t.partitioner == "" {
|
|
|
// partitioner not yet set
|
|
|
return
|
|
|
@@ -377,7 +373,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
|
|
|
// // Create host selection policy using a simple host pool
|
|
|
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(hostpool.New(nil))
|
|
|
//
|
|
|
-// // Create host selection policy using an epsilon greddy pool
|
|
|
+// // Create host selection policy using an epsilon greedy pool
|
|
|
// cluster.PoolConfig.HostSelectionPolicy = HostPoolHostPolicy(
|
|
|
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
|
|
|
// )
|
|
|
@@ -411,18 +407,20 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
|
|
|
r.mu.Lock()
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
|
- if _, ok := r.hostMap[host.Peer()]; ok {
|
|
|
+ // If the host addr is present and isn't nil return
|
|
|
+ if h, ok := r.hostMap[host.Peer()]; ok && h != nil{
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
- hosts := make([]string, 0, len(r.hostMap)+1)
|
|
|
+ // otherwise, add the host to the map
|
|
|
+ r.hostMap[host.Peer()] = host
|
|
|
+ // and construct a new peer list to give to the HostPool
|
|
|
+ hosts := make([]string, 0, len(r.hostMap))
|
|
|
for addr := range r.hostMap {
|
|
|
hosts = append(hosts, addr)
|
|
|
}
|
|
|
- hosts = append(hosts, host.Peer())
|
|
|
|
|
|
r.hp.SetHosts(hosts)
|
|
|
- r.hostMap[host.Peer()] = host
|
|
|
+
|
|
|
}
|
|
|
|
|
|
func (r *hostPoolHostPolicy) RemoveHost(addr string) {
|