|
|
@@ -297,6 +297,9 @@ type tokenAwareHostPolicy struct {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+
|
|
|
if t.partitioner != partitioner {
|
|
|
t.fallback.SetPartitioner(partitioner)
|
|
|
t.partitioner = partitioner
|
|
|
@@ -306,6 +309,9 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+
|
|
|
t.hosts.add(host)
|
|
|
t.fallback.AddHost(host)
|
|
|
|
|
|
@@ -313,6 +319,9 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+
|
|
|
t.hosts.remove(host.ConnectAddress())
|
|
|
t.fallback.RemoveHost(host)
|
|
|
|
|
|
@@ -328,9 +337,6 @@ func (t *tokenAwareHostPolicy) HostDown(host *HostInfo) {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) resetTokenRing() {
|
|
|
- t.mu.Lock()
|
|
|
- defer t.mu.Unlock()
|
|
|
-
|
|
|
if t.partitioner == "" {
|
|
|
// partitioner not yet set
|
|
|
return
|