|
|
@@ -398,21 +398,27 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware
|
|
|
return p
|
|
|
}
|
|
|
|
|
|
-type keyspaceMeta struct {
|
|
|
+// clusterMeta holds metadata about cluster topology.
|
|
|
+// It is used inside atomic.Value and shallow copies are used when replacing it,
|
|
|
+// so fields should not be modified in-place. Instead, to modify a field a copy of the field should be made
|
|
|
+// and the pointer in clusterMeta updated to point to the new value.
|
|
|
+type clusterMeta struct {
|
|
|
+ // replicas is map[keyspace]map[token]hosts
|
|
|
replicas map[string]map[token][]*HostInfo
|
|
|
+ tokenRing *tokenRing
|
|
|
}
|
|
|
|
|
|
type tokenAwareHostPolicy struct {
|
|
|
- hosts cowHostList
|
|
|
- mu sync.RWMutex
|
|
|
- partitioner string
|
|
|
fallback HostSelectionPolicy
|
|
|
session *Session
|
|
|
+ shuffleReplicas bool
|
|
|
|
|
|
- tokenRing atomic.Value // *tokenRing
|
|
|
- keyspaces atomic.Value // *keyspaceMeta
|
|
|
-
|
|
|
- shuffleReplicas bool
|
|
|
+ // mu protects writes to hosts, partitioner, metadata.
|
|
|
+ // reads can be unlocked as long as they are not used for updating state later.
|
|
|
+ mu sync.Mutex
|
|
|
+ hosts cowHostList
|
|
|
+ partitioner string
|
|
|
+ metadata atomic.Value // *clusterMeta
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) Init(s *Session) {
|
|
|
@@ -424,40 +430,37 @@ func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
|
|
|
- t.updateKeyspaceMetadata(update.Keyspace)
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+ meta := t.getMetadataForUpdate()
|
|
|
+ t.updateReplicas(meta, update.Keyspace)
|
|
|
+ t.metadata.Store(meta)
|
|
|
}
|
|
|
|
|
|
-func (t *tokenAwareHostPolicy) updateKeyspaceMetadata(keyspace string) {
|
|
|
- meta, _ := t.keyspaces.Load().(*keyspaceMeta)
|
|
|
- var size = 1
|
|
|
- if meta != nil {
|
|
|
- size = len(meta.replicas)
|
|
|
- }
|
|
|
-
|
|
|
- newMeta := &keyspaceMeta{
|
|
|
- replicas: make(map[string]map[token][]*HostInfo, size),
|
|
|
- }
|
|
|
+// updateReplicas updates replicas in clusterMeta.
|
|
|
+// It must be called with t.mu mutex locked.
|
|
|
+// meta must not be nil and it's replicas field will be updated.
|
|
|
+func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) {
|
|
|
+ newReplicas := make(map[string]map[token][]*HostInfo, len(meta.replicas))
|
|
|
|
|
|
ks, err := t.session.KeyspaceMetadata(keyspace)
|
|
|
if err == nil {
|
|
|
strat := getStrategy(ks)
|
|
|
if strat != nil {
|
|
|
- tr, _ := t.tokenRing.Load().(*tokenRing)
|
|
|
- if tr != nil {
|
|
|
- newMeta.replicas[keyspace] = strat.replicaMap(t.hosts.get(), tr.tokens)
|
|
|
+ if meta != nil && meta.tokenRing != nil {
|
|
|
+ hosts := t.hosts.get()
|
|
|
+ newReplicas[keyspace] = strat.replicaMap(hosts, meta.tokenRing.tokens)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if meta != nil {
|
|
|
- for ks, replicas := range meta.replicas {
|
|
|
- if ks != keyspace {
|
|
|
- newMeta.replicas[ks] = replicas
|
|
|
- }
|
|
|
+ for ks, replicas := range meta.replicas {
|
|
|
+ if ks != keyspace {
|
|
|
+ newReplicas[ks] = replicas
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- t.keyspaces.Store(newMeta)
|
|
|
+ meta.replicas = newReplicas
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
|
|
|
@@ -467,53 +470,83 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
|
|
|
if t.partitioner != partitioner {
|
|
|
t.fallback.SetPartitioner(partitioner)
|
|
|
t.partitioner = partitioner
|
|
|
-
|
|
|
- t.resetTokenRing(partitioner)
|
|
|
+ meta := t.getMetadataForUpdate()
|
|
|
+ meta.resetTokenRing(t.partitioner, t.hosts.get())
|
|
|
+ if t.session != nil { // disable for unit tests
|
|
|
+ t.updateReplicas(meta, t.session.cfg.Keyspace)
|
|
|
+ }
|
|
|
+ t.metadata.Store(meta)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
|
|
|
- t.HostUp(host)
|
|
|
- if t.session != nil { // disable for unit tests
|
|
|
- t.updateKeyspaceMetadata(t.session.cfg.Keyspace)
|
|
|
+ t.mu.Lock()
|
|
|
+ if t.hosts.add(host) {
|
|
|
+ meta := t.getMetadataForUpdate()
|
|
|
+ meta.resetTokenRing(t.partitioner, t.hosts.get())
|
|
|
+ if t.session != nil { // disable for unit tests
|
|
|
+ t.updateReplicas(meta, t.session.cfg.Keyspace)
|
|
|
+ }
|
|
|
+ t.metadata.Store(meta)
|
|
|
}
|
|
|
+ t.mu.Unlock()
|
|
|
+
|
|
|
+ t.fallback.AddHost(host)
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
|
|
|
- t.HostDown(host)
|
|
|
- if t.session != nil { // disable for unit tests
|
|
|
- t.updateKeyspaceMetadata(t.session.cfg.Keyspace)
|
|
|
+ t.mu.Lock()
|
|
|
+ if t.hosts.remove(host.ConnectAddress()) {
|
|
|
+ meta := t.getMetadataForUpdate()
|
|
|
+ meta.resetTokenRing(t.partitioner, t.hosts.get())
|
|
|
+ if t.session != nil { // disable for unit tests
|
|
|
+ t.updateReplicas(meta, t.session.cfg.Keyspace)
|
|
|
+ }
|
|
|
+ t.metadata.Store(meta)
|
|
|
}
|
|
|
+ t.mu.Unlock()
|
|
|
+
|
|
|
+ t.fallback.RemoveHost(host)
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) {
|
|
|
- t.hosts.add(host)
|
|
|
- t.fallback.AddHost(host)
|
|
|
-
|
|
|
- t.mu.RLock()
|
|
|
- partitioner := t.partitioner
|
|
|
- t.mu.RUnlock()
|
|
|
- t.resetTokenRing(partitioner)
|
|
|
+ t.fallback.HostUp(host)
|
|
|
}
|
|
|
|
|
|
func (t *tokenAwareHostPolicy) HostDown(host *HostInfo) {
|
|
|
- t.hosts.remove(host.ConnectAddress())
|
|
|
- t.fallback.RemoveHost(host)
|
|
|
+ t.fallback.HostDown(host)
|
|
|
+}
|
|
|
|
|
|
- t.mu.RLock()
|
|
|
- partitioner := t.partitioner
|
|
|
- t.mu.RUnlock()
|
|
|
- t.resetTokenRing(partitioner)
|
|
|
+// getMetadataReadOnly returns current cluster metadata.
|
|
|
+// Metadata uses copy on write, so the returned value should be only used for reading.
|
|
|
+// To obtain a copy that could be updated, use getMetadataForUpdate instead.
|
|
|
+func (t *tokenAwareHostPolicy) getMetadataReadOnly() *clusterMeta {
|
|
|
+ meta, _ := t.metadata.Load().(*clusterMeta)
|
|
|
+ return meta
|
|
|
+}
|
|
|
+
|
|
|
+// getMetadataForUpdate returns clusterMeta suitable for updating.
|
|
|
+// It is a SHALLOW copy of current metadata in case it was already set or new empty clusterMeta otherwise.
|
|
|
+// This function should be called with t.mu mutex locked and the mutex should not be released before
|
|
|
+// storing the new metadata.
|
|
|
+func (t *tokenAwareHostPolicy) getMetadataForUpdate() *clusterMeta {
|
|
|
+ metaReadOnly := t.getMetadataReadOnly()
|
|
|
+ meta := new(clusterMeta)
|
|
|
+ if metaReadOnly != nil {
|
|
|
+ *meta = *metaReadOnly
|
|
|
+ }
|
|
|
+ return meta
|
|
|
}
|
|
|
|
|
|
-func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) {
|
|
|
+// resetTokenRing creates a new tokenRing.
|
|
|
+// It must be called with t.mu locked.
|
|
|
+func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) {
|
|
|
if partitioner == "" {
|
|
|
// partitioner not yet set
|
|
|
return
|
|
|
}
|
|
|
|
|
|
// create a new token ring
|
|
|
- hosts := t.hosts.get()
|
|
|
tokenRing, err := newTokenRing(partitioner, hosts)
|
|
|
if err != nil {
|
|
|
Logger.Printf("Unable to update the token ring due to error: %s", err)
|
|
|
@@ -521,15 +554,14 @@ func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) {
|
|
|
}
|
|
|
|
|
|
// replace the token ring
|
|
|
- t.tokenRing.Store(tokenRing)
|
|
|
+ m.tokenRing = tokenRing
|
|
|
}
|
|
|
|
|
|
-func (t *tokenAwareHostPolicy) getReplicas(keyspace string, token token) ([]*HostInfo, bool) {
|
|
|
- meta, _ := t.keyspaces.Load().(*keyspaceMeta)
|
|
|
- if meta == nil {
|
|
|
+func (m *clusterMeta) getReplicas(keyspace string, token token) ([]*HostInfo, bool) {
|
|
|
+ if m.replicas == nil {
|
|
|
return nil, false
|
|
|
}
|
|
|
- replicas, ok := meta.replicas[keyspace][token]
|
|
|
+ replicas, ok := m.replicas[keyspace][token]
|
|
|
return replicas, ok
|
|
|
}
|
|
|
|
|
|
@@ -545,17 +577,17 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
|
|
|
return t.fallback.Pick(qry)
|
|
|
}
|
|
|
|
|
|
- tr, _ := t.tokenRing.Load().(*tokenRing)
|
|
|
- if tr == nil {
|
|
|
+ meta := t.getMetadataReadOnly()
|
|
|
+ if meta == nil || meta.tokenRing == nil {
|
|
|
return t.fallback.Pick(qry)
|
|
|
}
|
|
|
|
|
|
- primaryEndpoint, token := tr.GetHostForPartitionKey(routingKey)
|
|
|
+ primaryEndpoint, token := meta.tokenRing.GetHostForPartitionKey(routingKey)
|
|
|
if primaryEndpoint == nil || token == nil {
|
|
|
return t.fallback.Pick(qry)
|
|
|
}
|
|
|
|
|
|
- replicas, ok := t.getReplicas(qry.Keyspace(), token)
|
|
|
+ replicas, ok := meta.getReplicas(qry.Keyspace(), token)
|
|
|
if !ok {
|
|
|
replicas = []*HostInfo{primaryEndpoint}
|
|
|
} else if t.shuffleReplicas {
|