Przeglądaj źródła

Fix more hostinfo (#888)

* GetHosts() is now more C* version tolerant

* Will connect even if the cluster has an invalid rpc_address

* Code review improvments

* Only prefer user provided ip address in the control connection case.
* Fixed logic on NodeUp when IgnorePeerAddr is set
Derrick J. Wippler 8 lat temu
rodzic
commit
be48a669b3
19 zmienionych plików z 524 dodań i 267 usunięć
  1. 4 4
      cassandra_test.go
  2. 3 3
      common_test.go
  3. 3 3
      conn.go
  4. 7 7
      connectionpool.go
  5. 4 43
      control.go
  6. 2 2
      control_test.go
  7. 13 20
      events.go
  8. 2 2
      filters.go
  9. 3 3
      filters_test.go
  10. 344 94
      host_source.go
  11. 63 1
      host_source_test.go
  12. 8 8
      policies.go
  13. 28 28
      policies_test.go
  14. 3 3
      ring.go
  15. 3 3
      ring_test.go
  16. 8 20
      session.go
  17. 4 4
      session_connect_test.go
  18. 1 1
      token.go
  19. 21 18
      token_test.go

+ 4 - 4
cassandra_test.go

@@ -66,7 +66,7 @@ func TestRingDiscovery(t *testing.T) {
 
 	if *clusterSize != size {
 		for p, pool := range session.pool.hostConnPools {
-			t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String())
+			t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())
 
 		}
 		t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
@@ -586,7 +586,7 @@ func TestReconnection(t *testing.T) {
 	defer session.Close()
 
 	h := session.ring.allHosts()[0]
-	session.handleNodeDown(h.Peer(), h.Port())
+	session.handleNodeDown(h.ConnectAddress(), h.Port())
 
 	if h.State() != NodeDown {
 		t.Fatal("Host should be NodeDown but not.")
@@ -2437,8 +2437,8 @@ func TestDiscoverViaProxy(t *testing.T) {
 	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if !session.hostSource.localHasRpcAddr {
-		t.Skip("Target cluster does not have rpc_address in system.local.")
+	if session.hostSource.localHost.BroadcastAddress() == nil {
+		t.Skip("Target cluster does not have broadcast_address in system.local.")
 		goto close
 	}
 

+ 3 - 3
common_test.go

@@ -4,11 +4,11 @@ import (
 	"flag"
 	"fmt"
 	"log"
+	"net"
 	"strings"
 	"sync"
 	"testing"
 	"time"
-	"net"
 )
 
 var (
@@ -154,9 +154,9 @@ func createTestSession() *Session {
 	config.IgnorePeerAddr = true
 	config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
 	session := &Session{
-		cfg:    *config,
+		cfg: *config,
 		connCfg: &ConnConfig{
-			Timeout: 10*time.Millisecond,
+			Timeout:   10 * time.Millisecond,
 			Keepalive: 0,
 		},
 		policy: config.PoolConfig.HostSelectionPolicy,

+ 3 - 3
conn.go

@@ -156,8 +156,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
 	// TODO(zariel): remove these
 	if host == nil {
 		panic("host is nil")
-	} else if len(host.Peer()) == 0 {
-		panic("host missing peer ip address")
+	} else if len(host.ConnectAddress()) == 0 {
+		panic("host missing connect ip address")
 	} else if host.Port() == 0 {
 		panic("host missing port")
 	}
@@ -172,7 +172,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
 	}
 
 	// TODO(zariel): handle ipv6 zone
-	translatedPeer, translatedPort := session.cfg.translateAddressPort(host.Peer(), host.Port())
+	translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port())
 	addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String()
 	//addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String()
 

+ 7 - 7
connectionpool.go

@@ -132,7 +132,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 			// don't create a connection pool for a down host
 			continue
 		}
-		ip := host.Peer().String()
+		ip := host.ConnectAddress().String()
 		if _, exists := p.hostConnPools[ip]; exists {
 			// still have this host, so don't remove it
 			delete(toRemove, ip)
@@ -158,7 +158,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 		createCount--
 		if pool.Size() > 0 {
 			// add pool only if there a connections available
-			p.hostConnPools[string(pool.host.Peer())] = pool
+			p.hostConnPools[string(pool.host.ConnectAddress())] = pool
 		}
 	}
 
@@ -181,7 +181,7 @@ func (p *policyConnPool) Size() int {
 }
 
 func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 	p.mu.RLock()
 	pool, ok = p.hostConnPools[ip]
 	p.mu.RUnlock()
@@ -200,7 +200,7 @@ func (p *policyConnPool) Close() {
 }
 
 func (p *policyConnPool) addHost(host *HostInfo) {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 	p.mu.Lock()
 	pool, ok := p.hostConnPools[ip]
 	if !ok {
@@ -278,7 +278,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
 		session:  session,
 		host:     host,
 		port:     port,
-		addr:     (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
+		addr:     (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
 		size:     size,
 		keyspace: keyspace,
 		conns:    make([]*Conn, 0, size),
@@ -402,7 +402,7 @@ func (pool *hostConnPool) fill() {
 
 			// this is call with the connection pool mutex held, this call will
 			// then recursively try to lock it again. FIXME
-			go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
+			go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
 			return
 		}
 
@@ -424,7 +424,7 @@ func (pool *hostConnPool) logConnectErr(err error) {
 		// connection refused
 		// these are typical during a node outage so avoid log spam.
 		if gocqlDebug {
-			Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
+			Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err)
 		}
 	} else if err != nil {
 		// unexpected error

+ 4 - 43
control.go

@@ -132,7 +132,7 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) {
 
 	}
 
-	return &HostInfo{peer: ip, port: port}, nil
+	return &HostInfo{connectAddress: ip, port: port}, nil
 }
 
 func shuffleHosts(hosts []*HostInfo) []*HostInfo {
@@ -161,7 +161,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) {
 			return conn, nil
 		}
 
-		Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.Peer(), err)
+		Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.ConnectAddress(), err)
 	}
 
 	return nil, err
@@ -314,7 +314,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 		if err != nil {
 			// host is dead
 			// TODO: this is replicated in a few places
-			c.session.handleNodeDown(host.Peer(), host.Port())
+			c.session.handleNodeDown(host.ConnectAddress(), host.Port())
 		} else {
 			newConn = conn
 		}
@@ -420,52 +420,13 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 	return
 }
 
-func (c *controlConn) fetchHostInfo(ip net.IP, port int) (*HostInfo, error) {
-	// TODO(zariel): we should probably move this into host_source or atleast
-	// share code with it.
-	localHost := c.host()
-	if localHost == nil {
-		return nil, errors.New("unable to fetch host info, invalid conn host")
-	}
-
-	isLocal := localHost.Peer().Equal(ip)
-
-	var fn func(*HostInfo) error
-
-	// TODO(zariel): fetch preferred_ip address (is it >3.x only?)
-	if isLocal {
-		fn = func(host *HostInfo) error {
-			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'")
-			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
-			return iter.Close()
-		}
-	} else {
-		fn = func(host *HostInfo) error {
-			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", ip)
-			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
-			return iter.Close()
-		}
-	}
-
-	host := &HostInfo{
-		port: port,
-		peer: ip,
-	}
-
-	if err := fn(host); err != nil {
-		return nil, err
-	}
-
-	return host, nil
-}
-
 func (c *controlConn) awaitSchemaAgreement() error {
 	return c.withConn(func(conn *Conn) *Iter {
 		return &Iter{err: conn.awaitSchemaAgreement()}
 	}).err
 }
 
-func (c *controlConn) host() *HostInfo {
+func (c *controlConn) GetHostInfo() *HostInfo {
 	conn := c.conn.Load().(*Conn)
 	if conn == nil {
 		return nil

+ 2 - 2
control_test.go

@@ -24,8 +24,8 @@ func TestHostInfo_Lookup(t *testing.T) {
 			continue
 		}
 
-		if !host.peer.Equal(test.ip) {
-			t.Errorf("expected ip %v got %v for addr %q", test.ip, host.peer, test.addr)
+		if !host.ConnectAddress().Equal(test.ip) {
+			t.Errorf("expected ip %v got %v for addr %q", test.ip, host.ConnectAddress(), test.addr)
 		}
 	}
 }

+ 13 - 20
events.go

@@ -174,23 +174,15 @@ func (s *Session) handleNodeEvent(frames []frame) {
 }
 
 func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) {
-	var hostInfo *HostInfo
-	if s.control != nil && !s.cfg.IgnorePeerAddr {
-		var err error
-		hostInfo, err = s.control.fetchHostInfo(ip, port)
-		if err != nil {
-			Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
-			return
-		}
-	} else {
-		hostInfo = &HostInfo{peer: ip, port: port}
-	}
-
-	if s.cfg.IgnorePeerAddr && hostInfo.Peer().Equal(ip) {
-		hostInfo.setPeer(ip)
+	// Get host info and apply any filters to the host
+	hostInfo, err := s.hostSource.GetHostInfo(ip, port)
+	if err != nil {
+		Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
+		return
 	}
 
-	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(hostInfo) {
+	// If hostInfo is nil, this host was filtered out by cfg.HostFilter
+	if hostInfo == nil {
 		return
 	}
 
@@ -216,7 +208,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
 	// we remove all nodes but only add ones which pass the filter
 	host := s.ring.getHost(ip)
 	if host == nil {
-		host = &HostInfo{peer: ip, port: port}
+		host = &HostInfo{connectAddress: ip, port: port}
 	}
 
 	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
@@ -240,9 +232,10 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
 
 	host := s.ring.getHost(ip)
 	if host != nil {
-		if s.cfg.IgnorePeerAddr && host.Peer().Equal(ip) {
-			// TODO: how can this ever be true?
-			host.setPeer(ip)
+		// If we receive a node up event and user has asked us to ignore the peer address use
+		// the address provide by the event instead the address provide by peer the table.
+		if s.cfg.IgnorePeerAddr && !host.ConnectAddress().Equal(ip) {
+			host.SetConnectAddress(ip)
 		}
 
 		if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
@@ -269,7 +262,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
 
 	host := s.ring.getHost(ip)
 	if host == nil {
-		host = &HostInfo{peer: ip, port: port}
+		host = &HostInfo{connectAddress: ip, port: port}
 	}
 
 	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {

+ 2 - 2
filters.go

@@ -48,10 +48,10 @@ func WhiteListHostFilter(hosts ...string) HostFilter {
 
 	m := make(map[string]bool, len(hostInfos))
 	for _, host := range hostInfos {
-		m[string(host.peer)] = true
+		m[string(host.ConnectAddress())] = true
 	}
 
 	return HostFilterFunc(func(host *HostInfo) bool {
-		return m[string(host.Peer())]
+		return m[string(host.ConnectAddress())]
 	})
 }

+ 3 - 3
filters_test.go

@@ -17,7 +17,7 @@ func TestFilter_WhiteList(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{peer: test.addr}) {
+		if f.Accept(&HostInfo{connectAddress: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}
@@ -39,7 +39,7 @@ func TestFilter_AllowAll(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{peer: test.addr}) {
+		if f.Accept(&HostInfo{connectAddress: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}
@@ -61,7 +61,7 @@ func TestFilter_DenyAll(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{peer: test.addr}) {
+		if f.Accept(&HostInfo{connectAddress: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}

+ 344 - 94
host_source.go

@@ -1,6 +1,7 @@
 package gocql
 
 import (
+	"errors"
 	"fmt"
 	"net"
 	"strconv"
@@ -98,15 +99,25 @@ func (c cassVersion) nodeUpDelay() time.Duration {
 type HostInfo struct {
 	// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
 	// that we are thread safe use a mutex to access all fields.
-	mu         sync.RWMutex
-	peer       net.IP
-	port       int
-	dataCenter string
-	rack       string
-	hostId     string
-	version    cassVersion
-	state      nodeState
-	tokens     []string
+	mu               sync.RWMutex
+	peer             net.IP
+	broadcastAddress net.IP
+	listenAddress    net.IP
+	rpcAddress       net.IP
+	preferredIP      net.IP
+	connectAddress   net.IP
+	port             int
+	dataCenter       string
+	rack             string
+	hostId           string
+	workload         string
+	graph            bool
+	dseVersion       string
+	partitioner      string
+	clusterName      string
+	version          cassVersion
+	state            nodeState
+	tokens           []string
 }
 
 func (h *HostInfo) Equal(host *HostInfo) bool {
@@ -115,7 +126,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool {
 	host.mu.RLock()
 	defer host.mu.RUnlock()
 
-	return h.peer.Equal(host.peer)
+	return h.ConnectAddress().Equal(host.ConnectAddress())
 }
 
 func (h *HostInfo) Peer() net.IP {
@@ -131,6 +142,57 @@ func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
 	return h
 }
 
+// Returns the address that should be used to connect to the host.
+// If you wish to override this, use an AddressTranslator or
+// use a HostFilter to SetConnectAddress()
+func (h *HostInfo) ConnectAddress() net.IP {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+
+	if h.connectAddress == nil {
+		// Use 'rpc_address' if provided and it's not 0.0.0.0
+		if h.rpcAddress != nil && !h.rpcAddress.IsUnspecified() {
+			return h.rpcAddress
+		}
+		// Peer should always be set if this from 'system.peer'
+		if h.peer != nil {
+			return h.peer
+		}
+	}
+	return h.connectAddress
+}
+
+func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	h.connectAddress = address
+	return h
+}
+
+func (h *HostInfo) BroadcastAddress() net.IP {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.broadcastAddress
+}
+
+func (h *HostInfo) ListenAddress() net.IP {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.listenAddress
+}
+
+func (h *HostInfo) RPCAddress() net.IP {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.rpcAddress
+}
+
+func (h *HostInfo) PreferredIP() net.IP {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.preferredIP
+}
+
 func (h *HostInfo) DataCenter() string {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
@@ -170,6 +232,36 @@ func (h *HostInfo) setHostID(hostID string) *HostInfo {
 	return h
 }
 
+func (h *HostInfo) WorkLoad() string {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.workload
+}
+
+func (h *HostInfo) Graph() bool {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.graph
+}
+
+func (h *HostInfo) DSEVersion() string {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.dseVersion
+}
+
+func (h *HostInfo) Partitioner() string {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.partitioner
+}
+
+func (h *HostInfo) ClusterName() string {
+	h.mu.RLock()
+	defer h.mu.RUnlock()
+	return h.clusterName
+}
+
 func (h *HostInfo) Version() cassVersion {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
@@ -239,25 +331,24 @@ func (h *HostInfo) IsUp() bool {
 func (h *HostInfo) String() string {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
-	return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
+	return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
+		"port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
+		h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress,
+		h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
 }
 
 // Polls system.peers at a specific interval to find new hosts
 type ringDescriber struct {
-	dcFilter   string
-	rackFilter string
-	session    *Session
-	closeChan  chan bool
-	// indicates that we can use system.local to get the connections remote address
-	localHasRpcAddr bool
-
+	session         *Session
 	mu              sync.Mutex
 	prevHosts       []*HostInfo
+	localHost       *HostInfo
 	prevPartitioner string
 }
 
-func checkSystemLocal(control *controlConn) (bool, error) {
-	iter := control.query("SELECT broadcast_address FROM system.local")
+// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
+func checkSystemSchema(control *controlConn) (bool, error) {
+	iter := control.query("SELECT * FROM system_schema.keyspaces")
 	if err := iter.err; err != nil {
 		if errf, ok := err.(*errorFrame); ok {
 			if errf.code == errSyntax {
@@ -271,106 +362,267 @@ func checkSystemLocal(control *controlConn) (bool, error) {
 	return true, nil
 }
 
-// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
-func checkSystemSchema(control *controlConn) (bool, error) {
-	iter := control.query("SELECT * FROM system_schema.keyspaces")
-	if err := iter.err; err != nil {
-		if errf, ok := err.(*errorFrame); ok {
-			if errf.code == errReadFailure {
-				return false, nil
+// Given a map that represents a row from either system.local or system.peers
+// return as much information as we can in *HostInfo
+func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (*HostInfo, error) {
+	const assertErrorMsg = "Assertion failed for %s"
+	var ok bool
+
+	// Default to our connected port if the cluster doesn't have port information
+	host := HostInfo{
+		port: r.session.cfg.Port,
+	}
+
+	for key, value := range row {
+		switch key {
+		case "data_center":
+			host.dataCenter, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "data_center")
+			}
+		case "rack":
+			host.rack, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "rack")
+			}
+		case "host_id":
+			hostId, ok := value.(UUID)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "host_id")
+			}
+			host.hostId = hostId.String()
+		case "release_version":
+			version, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "release_version")
+			}
+			host.version.Set(version)
+		case "peer":
+			ip, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "peer")
+			}
+			host.peer = net.ParseIP(ip)
+		case "cluster_name":
+			host.clusterName, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
+			}
+		case "partitioner":
+			host.partitioner, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "partitioner")
+			}
+		case "broadcast_address":
+			ip, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
+			}
+			host.broadcastAddress = net.ParseIP(ip)
+		case "preferred_ip":
+			ip, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
+			}
+			host.preferredIP = net.ParseIP(ip)
+		case "rpc_address":
+			ip, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
+			}
+			host.rpcAddress = net.ParseIP(ip)
+		case "listen_address":
+			ip, ok := value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "listen_address")
+			}
+			host.listenAddress = net.ParseIP(ip)
+		case "workload":
+			host.workload, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "workload")
+			}
+		case "graph":
+			host.graph, ok = value.(bool)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "graph")
+			}
+		case "tokens":
+			host.tokens, ok = value.([]string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "tokens")
+			}
+		case "dse_version":
+			host.dseVersion, ok = value.(string)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "dse_version")
 			}
 		}
+		// TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
+		// Not sure what the port field will be called until the JIRA issue is complete
+	}
 
-		return false, err
+	return &host, nil
+}
+
+// Ask the control node for it's local host information
+func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) {
+	it := r.session.control.query("SELECT * FROM system.local WHERE key='local'")
+	if it == nil {
+		return nil, errors.New("Attempted to query 'system.local' on a closed control connection")
 	}
+	return r.extractHostInfo(it)
+}
 
-	return true, nil
+// Given an ip address and port, return a peer that matched the ip address
+func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) {
+	it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip)
+	if it == nil {
+		return nil, errors.New("Attempted to query 'system.peers' on a closed control connection")
+	}
+	return r.extractHostInfo(it)
 }
 
-func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	// we need conn to be the same because we need to query system.peers and system.local
-	// on the same node to get the whole cluster
+func (r *ringDescriber) extractHostInfo(it *Iter) (*HostInfo, error) {
+	row := make(map[string]interface{})
 
-	const (
-		legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
-		// only supported in 2.2.0, 2.1.6, 2.0.16
-		localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
-	)
+	// expect only 1 row
+	it.MapScan(row)
+	if err := it.Close(); err != nil {
+		return nil, err
+	}
 
-	localHost := &HostInfo{}
-	if r.localHasRpcAddr {
-		iter := r.session.control.query(localQuery)
-		if iter == nil {
-			return r.prevHosts, r.prevPartitioner, nil
-		}
+	// extract all available info about the host
+	return r.hostInfoFromMap(row)
+}
 
-		iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
-			&localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
+// Ask the control node for host info on all it's known peers
+func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) {
+	var hosts []*HostInfo
 
-		if err = iter.Close(); err != nil {
-			return nil, "", err
-		}
-	} else {
-		iter := r.session.control.withConn(func(c *Conn) *Iter {
-			localHost = c.host
-			return c.query(legacyLocalQuery)
-		})
+	// Ask the node for a list of it's peers
+	it := r.session.control.query("SELECT * FROM system.peers")
+	if it == nil {
+		return nil, errors.New("Attempted to query 'system.peers' on a closed connection")
+	}
 
-		if iter == nil {
-			return r.prevHosts, r.prevPartitioner, nil
+	for {
+		row := make(map[string]interface{})
+		if !it.MapScan(row) {
+			break
+		}
+		// extract all available info about the peer
+		host, err := r.hostInfoFromMap(row)
+		if err != nil {
+			return nil, err
 		}
 
-		iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
-
-		if err = iter.Close(); err != nil {
-			return nil, "", err
+		// If it's not a valid peer
+		if !r.IsValidPeer(host) {
+			Logger.Printf("Found invalid peer '%+v' "+
+				"Likely due to a gossip or snitch issue, this host will be ignored", host)
+			continue
 		}
+		hosts = append(hosts, host)
 	}
+	if it.err != nil {
+		return nil, fmt.Errorf("while scanning 'system.peers' table: %s", it.err)
+	}
+	return hosts, nil
+}
 
-	localHost.port = r.session.cfg.Port
+// Return true if the host is a valid peer
+func (r *ringDescriber) IsValidPeer(host *HostInfo) bool {
+	return !(len(host.RPCAddress()) == 0 ||
+		host.hostId == "" ||
+		host.dataCenter == "" ||
+		host.rack == "" ||
+		len(host.tokens) == 0)
+}
 
-	hosts = []*HostInfo{localHost}
+// Return a list of hosts the cluster knows about
+func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
 
-	rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
-	if rows == nil {
-		return r.prevHosts, r.prevPartitioner, nil
+	// Update the localHost info with data from the connected host
+	localHost, err := r.GetLocalHostInfo()
+	if err != nil {
+		return r.prevHosts, r.prevPartitioner, err
 	}
 
-	for rows.Next() {
-		host := &HostInfo{port: r.session.cfg.Port}
-		err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
-		if err != nil {
-			Logger.Println(err)
-			continue
-		}
+	// Update our list of hosts by querying the cluster
+	hosts, err := r.GetClusterPeerInfo()
+	if err != nil {
+		return r.prevHosts, r.prevPartitioner, err
+	}
 
-		if r.matchFilter(host) {
-			hosts = append(hosts, host)
+	hosts = append(hosts, localHost)
+
+	// Filter the hosts if filter is provided
+	filteredHosts := hosts
+	if r.session.cfg.HostFilter != nil {
+		filteredHosts = filteredHosts[:0]
+		for _, host := range hosts {
+			if r.session.cfg.HostFilter.Accept(host) {
+				filteredHosts = append(filteredHosts, host)
+			}
 		}
 	}
 
-	if err = rows.Err(); err != nil {
-		return nil, "", err
+	r.prevHosts = filteredHosts
+	r.prevPartitioner = localHost.partitioner
+	r.localHost = localHost
+
+	return filteredHosts, localHost.partitioner, nil
+}
+
+// Given an ip/port return HostInfo for the specified ip/port
+func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) {
+	// TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup?
+	// TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true?
+
+	// Ignore the port and connect address and use the address/port we already have
+	if r.session.control == nil || r.session.cfg.IgnorePeerAddr {
+		return &HostInfo{connectAddress: ip, port: port}, nil
 	}
 
-	r.prevHosts = hosts
-	r.prevPartitioner = partitioner
+	// Attempt to get the host info for our control connection
+	controlHost := r.session.control.GetHostInfo()
+	if controlHost == nil {
+		return nil, errors.New("invalid control connection")
+	}
 
-	return hosts, partitioner, nil
-}
+	var (
+		host *HostInfo
+		err  error
+	)
+
+	// If we are asking about the same node our control connection has a connection too
+	if controlHost.ConnectAddress().Equal(ip) {
+		host, err = r.GetLocalHostInfo()
 
-func (r *ringDescriber) matchFilter(host *HostInfo) bool {
-	if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
-		return false
+		// Always respect the provided control node address and disregard the ip address
+		// the cassandra node provides. We do this as we are already connected and have a
+		// known valid ip address. This insulates gocql from client connection issues stemming
+		// from node misconfiguration. For instance when a node is run from a container, by
+		// default the node will report its ip address as 127.0.0.1 which is typically invalid.
+		host.SetConnectAddress(ip)
+	} else {
+		host, err = r.GetPeerHostInfo(ip, port)
 	}
 
-	if r.rackFilter != "" && r.rackFilter != host.Rack() {
-		return false
+	// No host was found matching this ip/port
+	if err != nil {
+		return nil, err
 	}
 
-	return true
+	// Apply host filter to the result
+	if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true {
+		return nil, err
+	}
+
+	return host, err
 }
 
 func (r *ringDescriber) refreshRing() error {
@@ -386,13 +638,11 @@ func (r *ringDescriber) refreshRing() error {
 	// TODO: move this to session
 	// TODO: handle removing hosts here
 	for _, h := range hosts {
-		if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
-			if host, ok := r.session.ring.addHostIfMissing(h); !ok {
-				r.session.pool.addHost(h)
-				r.session.policy.AddHost(h)
-			} else {
-				host.update(h)
-			}
+		if host, ok := r.session.ring.addHostIfMissing(h); !ok {
+			r.session.pool.addHost(h)
+			r.session.policy.AddHost(h)
+		} else {
+			host.update(h)
 		}
 	}
 

+ 63 - 1
host_source_test.go

@@ -1,6 +1,12 @@
+// +build all integration
+
 package gocql
 
-import "testing"
+import (
+	"fmt"
+	"net"
+	"testing"
+)
 
 func TestUnmarshalCassVersion(t *testing.T) {
 	tests := [...]struct {
@@ -42,3 +48,59 @@ func TestCassVersionBefore(t *testing.T) {
 	}
 
 }
+
+func TestIsValidPeer(t *testing.T) {
+	ring := ringDescriber{}
+	host := &HostInfo{
+		rpcAddress: net.ParseIP("0.0.0.0"),
+		rack:       "myRack",
+		hostId:     "0",
+		dataCenter: "datacenter",
+		tokens:     []string{"0", "1"},
+	}
+
+	if !ring.IsValidPeer(host) {
+		t.Errorf("expected %+v to be a valid peer", host)
+	}
+
+	host.rack = ""
+	if ring.IsValidPeer(host) {
+		t.Errorf("expected %+v to NOT be a valid peer", host)
+	}
+}
+
+func TestGetHosts(t *testing.T) {
+	cluster := createCluster()
+	session := createSessionFromCluster(cluster, t)
+
+	hosts, partitioner, err := session.hostSource.GetHosts()
+
+	assertTrue(t, "err == nil", err == nil)
+	assertTrue(t, "len(hosts) == 3", len(hosts) == 3)
+	assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
+
+}
+
+func TestGetHostsWithFilter(t *testing.T) {
+	filterHostIP := net.ParseIP("127.0.0.3")
+	cluster := createCluster()
+
+	// Filter to remove one of the localhost nodes
+	cluster.HostFilter = HostFilterFunc(func(host *HostInfo) bool {
+		if host.ConnectAddress().Equal(filterHostIP) {
+			return false
+		}
+		return true
+	})
+	session := createSessionFromCluster(cluster, t)
+
+	hosts, partitioner, err := session.hostSource.GetHosts()
+	assertTrue(t, "err == nil", err == nil)
+	assertTrue(t, "len(hosts) == 2", len(hosts) == 2)
+	assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
+	for _, host := range hosts {
+		if host.ConnectAddress().Equal(filterHostIP) {
+			t.Fatal(fmt.Sprintf("Did not expect to see '%q' in host list", filterHostIP))
+		}
+	}
+}

+ 8 - 8
policies.go

@@ -105,7 +105,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
 	found := false
 	newL := make([]*HostInfo, 0, size)
 	for i := 0; i < len(l); i++ {
-		if !l[i].Peer().Equal(ip) {
+		if !l[i].ConnectAddress().Equal(ip) {
 			newL = append(newL, l[i])
 		} else {
 			found = true
@@ -267,7 +267,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) {
-	r.hosts.remove(host.Peer())
+	r.hosts.remove(host.ConnectAddress())
 }
 
 func (r *roundRobinHostPolicy) HostUp(host *HostInfo) {
@@ -310,7 +310,7 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
-	t.hosts.remove(host.Peer())
+	t.hosts.remove(host.ConnectAddress())
 	t.fallback.RemoveHost(host)
 
 	t.resetTokenRing()
@@ -424,7 +424,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
 	hostMap := make(map[string]*HostInfo, len(hosts))
 
 	for i, host := range hosts {
-		ip := host.Peer().String()
+		ip := host.ConnectAddress().String()
 		peers[i] = ip
 		hostMap[ip] = host
 	}
@@ -436,7 +436,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
 }
 
 func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 
 	r.mu.Lock()
 	defer r.mu.Unlock()
@@ -457,7 +457,7 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 
 	r.mu.Lock()
 	defer r.mu.Unlock()
@@ -469,7 +469,7 @@ func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
 	delete(r.hostMap, ip)
 	hosts := make([]string, 0, len(r.hostMap))
 	for _, host := range r.hostMap {
-		hosts = append(hosts, host.Peer().String())
+		hosts = append(hosts, host.ConnectAddress().String())
 	}
 
 	r.hp.SetHosts(hosts)
@@ -523,7 +523,7 @@ func (host selectedHostPoolHost) Info() *HostInfo {
 }
 
 func (host selectedHostPoolHost) Mark(err error) {
-	ip := host.info.Peer().String()
+	ip := host.info.ConnectAddress().String()
 
 	host.policy.mu.RLock()
 	defer host.policy.mu.RUnlock()

+ 28 - 28
policies_test.go

@@ -18,8 +18,8 @@ func TestRoundRobinHostPolicy(t *testing.T) {
 	policy := RoundRobinHostPolicy()
 
 	hosts := [...]*HostInfo{
-		{hostId: "0", peer: net.IPv4(0, 0, 0, 1)},
-		{hostId: "1", peer: net.IPv4(0, 0, 0, 2)},
+		{hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)},
+		{hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)},
 	}
 
 	for _, host := range hosts {
@@ -69,10 +69,10 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 
 	// set the hosts
 	hosts := [...]*HostInfo{
-		{peer: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
-		{peer: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
-		{peer: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
-		{peer: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
+		{connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
+		{connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
+		{connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
+		{connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
 	}
 	for _, host := range hosts {
 		policy.AddHost(host)
@@ -80,13 +80,13 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 
 	// the token ring is not setup without the partitioner, but the fallback
 	// should work
-	if actual := policy.Pick(nil)(); !actual.Info().Peer().Equal(hosts[0].peer) {
-		t.Errorf("Expected peer 0 but was %s", actual.Info().Peer())
+	if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
+		t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
 	}
 
 	query.RoutingKey([]byte("30"))
-	if actual := policy.Pick(query)(); !actual.Info().Peer().Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer())
+	if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
 	}
 
 	policy.SetPartitioner("OrderedPartitioner")
@@ -94,18 +94,18 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 	// now the token ring is configured
 	query.RoutingKey([]byte("20"))
 	iter = policy.Pick(query)
-	if actual := iter(); !actual.Info().Peer().Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer())
+	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
 	}
 	// rest are round robin
-	if actual := iter(); !actual.Info().Peer().Equal(hosts[2].peer) {
-		t.Errorf("Expected peer 2 but was %s", actual.Info().Peer())
+	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) {
+		t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress())
 	}
-	if actual := iter(); !actual.Info().Peer().Equal(hosts[3].peer) {
-		t.Errorf("Expected peer 3 but was %s", actual.Info().Peer())
+	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) {
+		t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress())
 	}
-	if actual := iter(); !actual.Info().Peer().Equal(hosts[0].peer) {
-		t.Errorf("Expected peer 0 but was %s", actual.Info().Peer())
+	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
+		t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
 	}
 }
 
@@ -114,8 +114,8 @@ func TestHostPoolHostPolicy(t *testing.T) {
 	policy := HostPoolHostPolicy(hostpool.New(nil))
 
 	hosts := []*HostInfo{
-		{hostId: "0", peer: net.IPv4(10, 0, 0, 0)},
-		{hostId: "1", peer: net.IPv4(10, 0, 0, 1)},
+		{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
+		{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
 	}
 
 	// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
@@ -179,10 +179,10 @@ func TestTokenAwareNilHostInfo(t *testing.T) {
 	policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
 
 	hosts := [...]*HostInfo{
-		{peer: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
-		{peer: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
-		{peer: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
-		{peer: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
+		{connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
+		{connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
+		{connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
+		{connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
 	}
 	for _, host := range hosts {
 		policy.AddHost(host)
@@ -198,8 +198,8 @@ func TestTokenAwareNilHostInfo(t *testing.T) {
 		t.Fatal("got nil host")
 	} else if v := next.Info(); v == nil {
 		t.Fatal("got nil HostInfo")
-	} else if !v.Peer().Equal(hosts[1].peer) {
-		t.Fatalf("expected peer 1 got %v", v.Peer())
+	} else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
+		t.Fatalf("expected peer 1 got %v", v.ConnectAddress())
 	}
 
 	// Empty the hosts to trigger the panic when using the fallback.
@@ -222,7 +222,7 @@ func TestCOWList_Add(t *testing.T) {
 	toAdd := [...]net.IP{net.IPv4(0, 0, 0, 0), net.IPv4(1, 0, 0, 0), net.IPv4(2, 0, 0, 0)}
 
 	for _, addr := range toAdd {
-		if !cow.add(&HostInfo{peer: addr}) {
+		if !cow.add(&HostInfo{connectAddress: addr}) {
 			t.Fatal("did not add peer which was not in the set")
 		}
 	}
@@ -234,7 +234,7 @@ func TestCOWList_Add(t *testing.T) {
 
 	set := make(map[string]bool)
 	for _, host := range hosts {
-		set[string(host.Peer())] = true
+		set[string(host.ConnectAddress())] = true
 	}
 
 	for _, addr := range toAdd {

+ 3 - 3
ring.go

@@ -53,7 +53,7 @@ func (r *ring) allHosts() []*HostInfo {
 }
 
 func (r *ring) addHost(host *HostInfo) bool {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 
 	r.mu.Lock()
 	if r.hosts == nil {
@@ -79,7 +79,7 @@ func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
 }
 
 func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
-	ip := host.Peer().String()
+	ip := host.ConnectAddress().String()
 
 	r.mu.Lock()
 	if r.hosts == nil {
@@ -106,7 +106,7 @@ func (r *ring) removeHost(ip net.IP) bool {
 	_, ok := r.hosts[k]
 	if ok {
 		for i, host := range r.hostList {
-			if host.Peer().Equal(ip) {
+			if host.ConnectAddress().Equal(ip) {
 				r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
 				break
 			}

+ 3 - 3
ring_test.go

@@ -8,7 +8,7 @@ import (
 func TestRing_AddHostIfMissing_Missing(t *testing.T) {
 	ring := &ring{}
 
-	host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
+	host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
 	h1, ok := ring.addHostIfMissing(host)
 	if ok {
 		t.Fatal("host was reported as already existing")
@@ -22,10 +22,10 @@ func TestRing_AddHostIfMissing_Missing(t *testing.T) {
 func TestRing_AddHostIfMissing_Existing(t *testing.T) {
 	ring := &ring{}
 
-	host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
+	host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
 	ring.addHostIfMissing(host)
 
-	h2 := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
+	h2 := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
 
 	h1, ok := ring.addHostIfMissing(h2)
 	if !ok {

+ 8 - 20
session.go

@@ -118,8 +118,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
 
 	s.hostSource = &ringDescriber{
-		session:   s,
-		closeChan: make(chan bool),
+		session: s,
 	}
 
 	if cfg.PoolConfig.HostSelectionPolicy == nil {
@@ -181,26 +180,19 @@ func (s *Session) init() error {
 			return err
 		}
 
-		// need to setup host source to check for broadcast_address in system.local
-		localHasRPCAddr, _ := checkSystemLocal(s.control)
-		s.hostSource.localHasRpcAddr = localHasRPCAddr
-
 		if !s.cfg.DisableInitialHostLookup {
-			// TODO(zariel): we need to get the partitioner from here
-			var p string
-			hosts, p, err = s.hostSource.GetHosts()
+			var partitioner string
+			hosts, partitioner, err = s.hostSource.GetHosts()
 			if err != nil {
 				return err
 			}
-			s.policy.SetPartitioner(p)
+			s.policy.SetPartitioner(partitioner)
 		}
 	}
 
 	for _, host := range hosts {
-		if s.cfg.HostFilter == nil || s.cfg.HostFilter.Accept(host) {
-			host = s.ring.addOrUpdate(host)
-			s.handleNodeUp(host.Peer(), host.Port(), false)
-		}
+		host = s.ring.addOrUpdate(host)
+		s.handleNodeUp(host.ConnectAddress(), host.Port(), false)
 	}
 
 	// TODO(zariel): we probably dont need this any more as we verify that we
@@ -241,7 +233,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
 			if gocqlDebug {
 				buf := bytes.NewBufferString("Session.ring:")
 				for _, h := range hosts {
-					buf.WriteString("[" + h.Peer().String() + ":" + h.State().String() + "]")
+					buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
 				}
 				Logger.Println(buf.String())
 			}
@@ -250,7 +242,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
 				if h.IsUp() {
 					continue
 				}
-				s.handleNodeUp(h.Peer(), h.Port(), true)
+				s.handleNodeUp(h.ConnectAddress(), h.Port(), true)
 			}
 		case <-s.quit:
 			return
@@ -351,10 +343,6 @@ func (s *Session) Close() {
 		s.pool.Close()
 	}
 
-	if s.hostSource != nil {
-		close(s.hostSource.closeChan)
-	}
-
 	if s.control != nil {
 		s.control.close()
 	}

+ 4 - 4
session_connect_test.go

@@ -103,8 +103,8 @@ func TestSession_connect_WithNoTranslator(t *testing.T) {
 	go srvr.Serve()
 
 	Connect(&HostInfo{
-		peer: srvr.Addr,
-		port: srvr.Port,
+		connectAddress: srvr.Addr,
+		port:           srvr.Port,
 	}, session.connCfg, testConnErrorHandler(t), session)
 
 	assertConnectionEventually(t, 500*time.Millisecond, srvr)
@@ -123,8 +123,8 @@ func TestSession_connect_WithTranslator(t *testing.T) {
 
 	// the provided address will be translated
 	Connect(&HostInfo{
-		peer: net.ParseIP("10.10.10.10"),
-		port: 5432,
+		connectAddress: net.ParseIP("10.10.10.10"),
+		port:           5432,
 	}, session.connCfg, testConnErrorHandler(t), session)
 
 	assertConnectionEventually(t, 500*time.Millisecond, srvr)

+ 1 - 1
token.go

@@ -184,7 +184,7 @@ func (t *tokenRing) String() string {
 		buf.WriteString("]")
 		buf.WriteString(t.tokens[i].String())
 		buf.WriteString(":")
-		buf.WriteString(t.hosts[i].Peer().String())
+		buf.WriteString(t.hosts[i].ConnectAddress().String())
 	}
 	buf.WriteString("\n}")
 	return string(buf.Bytes())

+ 21 - 18
token_test.go

@@ -232,8 +232,8 @@ func hostsForTests(n int) []*HostInfo {
 	hosts := make([]*HostInfo, n)
 	for i := 0; i < n; i++ {
 		host := &HostInfo{
-			peer:   net.IPv4(1, 1, 1, byte(n)),
-			tokens: []string{fmt.Sprintf("%d", n)},
+			connectAddress: net.IPv4(1, 1, 1, byte(n)),
+			tokens:         []string{fmt.Sprintf("%d", n)},
 		}
 
 		hosts[i] = host
@@ -254,19 +254,20 @@ func TestMurmur3TokenRing(t *testing.T) {
 
 	for _, host := range hosts {
 		actual := ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.Peer().Equal(host.peer) {
-			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
+		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
+			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
+				host.tokens[0], actual.ConnectAddress())
 		}
 	}
 
 	actual := ring.GetHostForToken(p.ParseString("12"))
-	if !actual.Peer().Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
+	if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
+		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.Peer().Equal(hosts[0].peer) {
-		t.Errorf("Expected peer 0 for token \"24324545443332\", but was %s", actual.Peer())
+	if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) {
+		t.Errorf("Expected address 0 for token \"24324545443332\", but was %s", actual.ConnectAddress())
 	}
 }
 
@@ -285,19 +286,20 @@ func TestOrderedTokenRing(t *testing.T) {
 	var actual *HostInfo
 	for _, host := range hosts {
 		actual = ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.Peer().Equal(host.peer) {
-			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
+		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
+			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
+				host.tokens[0], actual.ConnectAddress())
 		}
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("12"))
 	if !actual.peer.Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
+		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.peer.Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer())
+	if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
+		t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress())
 	}
 }
 
@@ -315,18 +317,19 @@ func TestRandomTokenRing(t *testing.T) {
 	var actual *HostInfo
 	for _, host := range hosts {
 		actual = ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.Peer().Equal(host.peer) {
-			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
+		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
+			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
+				host.tokens[0], actual.ConnectAddress())
 		}
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("12"))
 	if !actual.peer.Equal(hosts[1].peer) {
-		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
+		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.peer.Equal(hosts[0].peer) {
-		t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer())
+	if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) {
+		t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress())
 	}
 }