Переглянути джерело

session: bulk add hosts to TokenAwarePolicy on init (#1354)

Adding a new host to the TokenAwarePolicy requires recomputing the
replica map, previously we would add each host one at a time which
requires regenerating the replica map for each host and discarding the
old one. Fix this to only calcaulte it once during init after adding all
the hosts in a bulk operation.
Chris Bannister 6 роки тому
батько
коміт
4c7aea974b
2 змінених файлів з 39 додано та 1 видалено
  1. 19 0
      policies.go
  2. 20 1
      session.go

+ 19 - 0
policies.go

@@ -496,6 +496,25 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
 	t.fallback.AddHost(host)
 }
 
+func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) {
+	t.mu.Lock()
+
+	for _, host := range hosts {
+		t.hosts.add(host)
+	}
+
+	meta := t.getMetadataForUpdate()
+	meta.resetTokenRing(t.partitioner, t.hosts.get())
+	t.updateReplicas(meta, t.getKeyspaceName())
+	t.metadata.Store(meta)
+
+	t.mu.Unlock()
+
+	for _, host := range hosts {
+		t.fallback.AddHost(host)
+	}
+}
+
 func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
 	t.mu.Lock()
 	if t.hosts.remove(host.ConnectAddress()) {

+ 20 - 1
session.go

@@ -221,9 +221,28 @@ func (s *Session) init() error {
 		hostMap[host.ConnectAddress().String()] = host
 	}
 
+	hosts = hosts[:0]
 	for _, host := range hostMap {
 		host = s.ring.addOrUpdate(host)
-		s.addNewNode(host)
+		if s.cfg.filterHost(host) {
+			continue
+		}
+
+		host.setState(NodeUp)
+		s.pool.addHost(host)
+
+		hosts = append(hosts, host)
+	}
+
+	type bulkAddHosts interface {
+		AddHosts([]*HostInfo)
+	}
+	if v, ok := s.policy.(bulkAddHosts); ok {
+		v.AddHosts(hosts)
+	} else {
+		for _, host := range hosts {
+			s.policy.AddHost(host)
+		}
 	}
 
 	// TODO(zariel): we probably dont need this any more as we verify that we