Browse Source

hostsource: remove hosts no longer in peers (#952)

When refreshing the ring remove hosts from the drivers ring cache to
prevent it from trying to connect to removed hosts if it does not
receive a down host event.
Chris Bannister 8 years ago
parent
commit
77431609f5
3 changed files with 26 additions and 1 deletions
  1. 10 1
      host_source.go
  2. 10 0
      ring.go
  3. 6 0
      session.go

+ 10 - 1
host_source.go

@@ -659,8 +659,9 @@ func (r *ringDescriber) refreshRing() error {
 		return err
 		return err
 	}
 	}
 
 
+	prevHosts := r.session.ring.currentHosts()
+
 	// TODO: move this to session
 	// TODO: move this to session
-	// TODO: handle removing hosts here
 	for _, h := range hosts {
 	for _, h := range hosts {
 		if host, ok := r.session.ring.addHostIfMissing(h); !ok {
 		if host, ok := r.session.ring.addHostIfMissing(h); !ok {
 			r.session.pool.addHost(h)
 			r.session.pool.addHost(h)
@@ -668,6 +669,14 @@ func (r *ringDescriber) refreshRing() error {
 		} else {
 		} else {
 			host.update(h)
 			host.update(h)
 		}
 		}
+		delete(prevHosts, h.ConnectAddress().String())
+	}
+
+	// TODO(zariel): it may be worth having a mutex covering the overall ring state
+	// in a session so that everything sees a consistent state. Becuase as is today
+	// events can come in and due to ordering an UP host could be removed from the cluster
+	for _, host := range prevHosts {
+		r.session.removeHost(host)
 	}
 	}
 
 
 	r.session.metadata.setPartitioner(partitioner)
 	r.session.metadata.setPartitioner(partitioner)

+ 10 - 0
ring.go

@@ -53,6 +53,16 @@ func (r *ring) allHosts() []*HostInfo {
 	return hosts
 	return hosts
 }
 }
 
 
+func (r *ring) currentHosts() map[string]*HostInfo {
+	r.mu.RLock()
+	hosts := make(map[string]*HostInfo, len(r.hosts))
+	for k, v := range r.hosts {
+		hosts[k] = v
+	}
+	r.mu.RUnlock()
+	return hosts
+}
+
 func (r *ring) addHost(host *HostInfo) bool {
 func (r *ring) addHost(host *HostInfo) bool {
 	if host.invalidConnectAddr() {
 	if host.invalidConnectAddr() {
 		panic(fmt.Sprintf("invalid host: %v", host))
 		panic(fmt.Sprintf("invalid host: %v", host))

+ 6 - 0
session.go

@@ -384,6 +384,12 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	return iter
 	return iter
 }
 }
 
 
+func (s *Session) removeHost(h *HostInfo) {
+	s.policy.RemoveHost(h)
+	s.pool.removeHost(h.ConnectAddress())
+	s.ring.removeHost(h.ConnectAddress())
+}
+
 // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
 // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist.
 func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
 func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
 	// fail fast
 	// fail fast