Bläddra i källkod

Revert "GetHosts() is now more version tolerant"

Chris Bannister 8 år sedan
förälder
incheckning
ccce5fe285
19 ändrade filer med 267 tillägg och 537 borttagningar
  1. 4 4
      cassandra_test.go
  2. 3 3
      common_test.go
  3. 3 3
      conn.go
  4. 7 7
      connectionpool.go
  5. 43 4
      control.go
  6. 2 2
      control_test.go
  7. 19 11
      events.go
  8. 2 2
      filters.go
  9. 3 3
      filters_test.go
  10. 94 357
      host_source.go
  11. 1 63
      host_source_test.go
  12. 8 8
      policies.go
  13. 28 28
      policies_test.go
  14. 3 3
      ring.go
  15. 3 3
      ring_test.go
  16. 20 8
      session.go
  17. 5 6
      session_connect_test.go
  18. 1 1
      token.go
  19. 18 21
      token_test.go

+ 4 - 4
cassandra_test.go

@@ -67,7 +67,7 @@ func TestRingDiscovery(t *testing.T) {
 
 	if *clusterSize != size {
 		for p, pool := range session.pool.hostConnPools {
-			t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())
+			t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String())
 
 		}
 		t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
@@ -577,7 +577,7 @@ func TestReconnection(t *testing.T) {
 	defer session.Close()
 
 	h := session.ring.allHosts()[0]
-	session.handleNodeDown(h.ConnectAddress(), h.Port())
+	session.handleNodeDown(h.Peer(), h.Port())
 
 	if h.State() != NodeDown {
 		t.Fatal("Host should be NodeDown but not.")
@@ -2416,8 +2416,8 @@ func TestDiscoverViaProxy(t *testing.T) {
 	session := createSessionFromCluster(cluster, t)
 	defer session.Close()
 
-	if session.hostSource.localHost.BroadcastAddress() == nil {
-		t.Skip("Target cluster does not have broadcast_address in system.local.")
+	if !session.hostSource.localHasRpcAddr {
+		t.Skip("Target cluster does not have rpc_address in system.local.")
 		goto close
 	}
 

+ 3 - 3
common_test.go

@@ -4,11 +4,11 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"net"
 	"strings"
 	"sync"
 	"testing"
 	"time"
+	"net"
 )
 
 var (
@@ -154,9 +154,9 @@ func createTestSession() *Session {
 	config.IgnorePeerAddr = true
 	config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
 	session := &Session{
-		cfg: *config,
+		cfg:    *config,
 		connCfg: &ConnConfig{
-			Timeout:   10 * time.Millisecond,
+			Timeout: 10*time.Millisecond,
 			Keepalive: 0,
 		},
 		policy: config.PoolConfig.HostSelectionPolicy,

+ 3 - 3
conn.go

@@ -156,8 +156,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
 	// TODO(zariel): remove these
 	if host == nil {
 		panic("host is nil")
-	} else if len(host.ConnectAddress()) == 0 {
-		panic("host missing connect ip address")
+	} else if len(host.Peer()) == 0 {
+		panic("host missing peer ip address")
 	} else if host.Port() == 0 {
 		panic("host missing port")
 	}
@@ -172,7 +172,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
 	}
 
 	// TODO(zariel): handle ipv6 zone
-	translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port())
+	translatedPeer, translatedPort := session.cfg.translateAddressPort(host.Peer(), host.Port())
 	addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String()
 	//addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String()
 

+ 7 - 7
connectionpool.go

@@ -128,7 +128,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 			// don't create a connection pool for a down host
 			continue
 		}
-		ip := host.ConnectAddress().String()
+		ip := host.Peer().String()
 		if _, exists := p.hostConnPools[ip]; exists {
 			// still have this host, so don't remove it
 			delete(toRemove, ip)
@@ -154,7 +154,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 		createCount--
 		if pool.Size() > 0 {
 			// add pool only if there a connections available
-			p.hostConnPools[string(pool.host.ConnectAddress())] = pool
+			p.hostConnPools[string(pool.host.Peer())] = pool
 		}
 	}
 
@@ -177,7 +177,7 @@ func (p *policyConnPool) Size() int {
 }
 
 func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 	p.mu.RLock()
 	pool, ok = p.hostConnPools[ip]
 	p.mu.RUnlock()
@@ -196,7 +196,7 @@ func (p *policyConnPool) Close() {
 }
 
 func (p *policyConnPool) addHost(host *HostInfo) {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 	p.mu.Lock()
 	pool, ok := p.hostConnPools[ip]
 	if !ok {
@@ -274,7 +274,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
 		session:  session,
 		host:     host,
 		port:     port,
-		addr:     (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
+		addr:     (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
 		size:     size,
 		keyspace: keyspace,
 		conns:    make([]*Conn, 0, size),
@@ -398,7 +398,7 @@ func (pool *hostConnPool) fill() {
 
 			// this is call with the connection pool mutex held, this call will
 			// then recursively try to lock it again. FIXME
-			go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
+			go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
 			return
 		}
 
@@ -420,7 +420,7 @@ func (pool *hostConnPool) logConnectErr(err error) {
 		// connection refused
 		// these are typical during a node outage so avoid log spam.
 		if gocqlDebug {
-			Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err)
+			Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
 		}
 	} else if err != nil {
 		// unexpected error

+ 43 - 4
control.go

@@ -134,7 +134,7 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) {
 
 	}
 
-	return &HostInfo{connectAddress: ip, port: port}, nil
+	return &HostInfo{peer: ip, port: port}, nil
 }
 
 func shuffleHosts(hosts []*HostInfo) []*HostInfo {
@@ -163,7 +163,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) {
 			return conn, nil
 		}
 
-		Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.ConnectAddress(), err)
+		Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.Peer(), err)
 	}
 
 	return nil, err
@@ -316,7 +316,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 		if err != nil {
 			// host is dead
 			// TODO: this is replicated in a few places
-			c.session.handleNodeDown(host.ConnectAddress(), host.Port())
+			c.session.handleNodeDown(host.Peer(), host.Port())
 		} else {
 			newConn = conn
 		}
@@ -422,13 +422,52 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 	return
 }
 
+func (c *controlConn) fetchHostInfo(ip net.IP, port int) (*HostInfo, error) {
+	// TODO(zariel): we should probably move this into host_source or atleast
+	// share code with it.
+	localHost := c.host()
+	if localHost == nil {
+		return nil, errors.New("unable to fetch host info, invalid conn host")
+	}
+
+	isLocal := localHost.Peer().Equal(ip)
+
+	var fn func(*HostInfo) error
+
+	// TODO(zariel): fetch preferred_ip address (is it >3.x only?)
+	if isLocal {
+		fn = func(host *HostInfo) error {
+			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'")
+			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
+			return iter.Close()
+		}
+	} else {
+		fn = func(host *HostInfo) error {
+			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", ip)
+			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
+			return iter.Close()
+		}
+	}
+
+	host := &HostInfo{
+		port: port,
+		peer: ip,
+	}
+
+	if err := fn(host); err != nil {
+		return nil, err
+	}
+
+	return host, nil
+}
+
 func (c *controlConn) awaitSchemaAgreement() error {
 	return c.withConn(func(conn *Conn) *Iter {
 		return &Iter{err: conn.awaitSchemaAgreement()}
 	}).err
 }
 
-func (c *controlConn) GetHostInfo() *HostInfo {
+func (c *controlConn) host() *HostInfo {
 	conn := c.conn.Load().(*Conn)
 	if conn == nil {
 		return nil

+ 2 - 2
control_test.go

@@ -24,8 +24,8 @@ func TestHostInfo_Lookup(t *testing.T) {
 			continue
 		}
 
-		if !host.ConnectAddress().Equal(test.ip) {
-			t.Errorf("expected ip %v got %v for addr %q", test.ip, host.ConnectAddress(), test.addr)
+		if !host.peer.Equal(test.ip) {
+			t.Errorf("expected ip %v got %v for addr %q", test.ip, host.peer, test.addr)
 		}
 	}
 }

+ 19 - 11
events.go

@@ -174,15 +174,23 @@ func (s *Session) handleNodeEvent(frames []frame) {
 }
 
 func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) {
-	// Get host info and apply any filters to the host
-	hostInfo, err := s.hostSource.GetHostInfo(ip, port)
-	if err != nil {
-		Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
-		return
+	var hostInfo *HostInfo
+	if s.control != nil && !s.cfg.IgnorePeerAddr {
+		var err error
+		hostInfo, err = s.control.fetchHostInfo(ip, port)
+		if err != nil {
+			Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
+			return
+		}
+	} else {
+		hostInfo = &HostInfo{peer: ip, port: port}
+	}
+
+	if s.cfg.IgnorePeerAddr && hostInfo.Peer().Equal(ip) {
+		hostInfo.setPeer(ip)
 	}
 
-	// If hostInfo is nil, this host was filtered out by cfg.HostFilter
-	if hostInfo == nil {
+	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(hostInfo) {
 		return
 	}
 
@@ -208,7 +216,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
 	// we remove all nodes but only add ones which pass the filter
 	host := s.ring.getHost(ip)
 	if host == nil {
-		host = &HostInfo{connectAddress: ip, port: port}
+		host = &HostInfo{peer: ip, port: port}
 	}
 
 	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
@@ -232,9 +240,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
 
 	host := s.ring.getHost(ip)
 	if host != nil {
-		if s.cfg.IgnorePeerAddr && host.ConnectAddress().Equal(ip) {
+		if s.cfg.IgnorePeerAddr && host.Peer().Equal(ip) {
 			// TODO: how can this ever be true?
-			host.SetConnectAddress(ip)
+			host.setPeer(ip)
 		}
 
 		if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
@@ -261,7 +269,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
 
 	host := s.ring.getHost(ip)
 	if host == nil {
-		host = &HostInfo{connectAddress: ip, port: port}
+		host = &HostInfo{peer: ip, port: port}
 	}
 
 	if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {

+ 2 - 2
filters.go

@@ -48,10 +48,10 @@ func WhiteListHostFilter(hosts ...string) HostFilter {
 
 	m := make(map[string]bool, len(hostInfos))
 	for _, host := range hostInfos {
-		m[string(host.ConnectAddress())] = true
+		m[string(host.peer)] = true
 	}
 
 	return HostFilterFunc(func(host *HostInfo) bool {
-		return m[string(host.ConnectAddress())]
+		return m[string(host.Peer())]
 	})
 }

+ 3 - 3
filters_test.go

@@ -17,7 +17,7 @@ func TestFilter_WhiteList(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{connectAddress: test.addr}) {
+		if f.Accept(&HostInfo{peer: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}
@@ -39,7 +39,7 @@ func TestFilter_AllowAll(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{connectAddress: test.addr}) {
+		if f.Accept(&HostInfo{peer: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}
@@ -61,7 +61,7 @@ func TestFilter_DenyAll(t *testing.T) {
 	}
 
 	for i, test := range tests {
-		if f.Accept(&HostInfo{connectAddress: test.addr}) {
+		if f.Accept(&HostInfo{peer: test.addr}) {
 			if !test.accept {
 				t.Errorf("%d: should not have been accepted but was", i)
 			}

+ 94 - 357
host_source.go

@@ -7,12 +7,8 @@ import (
 	"strings"
 	"sync"
 	"time"
-
-	"github.com/pkg/errors"
 )
 
-const assertErrorMsg = "Assertion failed for %s"
-
 type nodeState int32
 
 func (n nodeState) String() string {
@@ -102,25 +98,15 @@ func (c cassVersion) nodeUpDelay() time.Duration {
 type HostInfo struct {
 	// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
 	// that we are thread safe use a mutex to access all fields.
-	mu               sync.RWMutex
-	peer             net.IP
-	broadcastAddress net.IP
-	listenAddress    net.IP
-	rpcAddress       net.IP
-	preferredIP      net.IP
-	connectAddress   net.IP
-	port             int
-	dataCenter       string
-	rack             string
-	hostId           string
-	workload         string
-	graph            bool
-	dseVersion       string
-	partitioner      string
-	clusterName      string
-	version          cassVersion
-	state            nodeState
-	tokens           []string
+	mu         sync.RWMutex
+	peer       net.IP
+	port       int
+	dataCenter string
+	rack       string
+	hostId     string
+	version    cassVersion
+	state      nodeState
+	tokens     []string
 }
 
 func (h *HostInfo) Equal(host *HostInfo) bool {
@@ -129,7 +115,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool {
 	host.mu.RLock()
 	defer host.mu.RUnlock()
 
-	return h.ConnectAddress().Equal(host.ConnectAddress())
+	return h.peer.Equal(host.peer)
 }
 
 func (h *HostInfo) Peer() net.IP {
@@ -145,57 +131,6 @@ func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
 	return h
 }
 
-// Returns the address that should be used to connect to the host
-// This defaults to 'broadcast_address', then falls back to 'peer'
-// This is to maintain existing functionality. If you wish to
-// override this, use an AddressTranslator or use a HostFilter
-// to SetConnectAddress()
-func (h *HostInfo) ConnectAddress() net.IP {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-
-	if h.connectAddress == nil {
-		if h.broadcastAddress != nil {
-			return h.broadcastAddress
-		}
-		if h.peer != nil {
-			return h.peer
-		}
-	}
-	return h.connectAddress
-}
-
-func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
-	h.mu.Lock()
-	defer h.mu.Unlock()
-	h.connectAddress = address
-	return h
-}
-
-func (h *HostInfo) BroadcastAddress() net.IP {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.broadcastAddress
-}
-
-func (h *HostInfo) ListenAddress() net.IP {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.listenAddress
-}
-
-func (h *HostInfo) RPCAddress() net.IP {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.rpcAddress
-}
-
-func (h *HostInfo) PreferredIP() net.IP {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.preferredIP
-}
-
 func (h *HostInfo) DataCenter() string {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
@@ -235,36 +170,6 @@ func (h *HostInfo) setHostID(hostID string) *HostInfo {
 	return h
 }
 
-func (h *HostInfo) WorkLoad() string {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.workload
-}
-
-func (h *HostInfo) Graph() bool {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.graph
-}
-
-func (h *HostInfo) DSEVersion() string {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.dseVersion
-}
-
-func (h *HostInfo) Partitioner() string {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.partitioner
-}
-
-func (h *HostInfo) ClusterName() string {
-	h.mu.RLock()
-	defer h.mu.RUnlock()
-	return h.clusterName
-}
-
 func (h *HostInfo) Version() cassVersion {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
@@ -334,27 +239,28 @@ func (h *HostInfo) IsUp() bool {
 func (h *HostInfo) String() string {
 	h.mu.RLock()
 	defer h.mu.RUnlock()
-	return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
-		"port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
-		h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress,
-		h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
+	return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
 }
 
 // Polls system.peers at a specific interval to find new hosts
 type ringDescriber struct {
-	session         *Session
+	dcFilter   string
+	rackFilter string
+	session    *Session
+	closeChan  chan bool
+	// indicates that we can use system.local to get the connections remote address
+	localHasRpcAddr bool
+
 	mu              sync.Mutex
 	prevHosts       []*HostInfo
-	localHost       *HostInfo
 	prevPartitioner string
 }
 
-// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
-func checkSystemSchema(control *controlConn) (bool, error) {
-	iter := control.query("SELECT * FROM system_schema.keyspaces")
+func checkSystemLocal(control *controlConn) (bool, error) {
+	iter := control.query("SELECT broadcast_address FROM system.local")
 	if err := iter.err; err != nil {
 		if errf, ok := err.(*errorFrame); ok {
-			if errf.code == errReadFailure {
+			if errf.code == errSyntax {
 				return false, nil
 			}
 		}
@@ -365,277 +271,106 @@ func checkSystemSchema(control *controlConn) (bool, error) {
 	return true, nil
 }
 
-// Given a map that represents a row from either system.local or system.peers
-// return as much information as we can in *HostInfo
-func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (error, *HostInfo) {
-	host := HostInfo{}
-	var ok bool
-
-	for key, value := range row {
-		switch key {
-		case "data_center":
-			host.dataCenter, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "data_center"), nil
-			}
-		case "rack":
-			host.rack, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "rack"), nil
-			}
-		case "host_id":
-			hostId, ok := value.(UUID)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "host_id"), nil
-			}
-			host.hostId = hostId.String()
-		case "release_version":
-			version, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "release_version"), nil
-			}
-			host.version.Set(version)
-		case "peer":
-			ip, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "peer"), nil
-			}
-			host.peer = net.ParseIP(ip)
-		case "cluster_name":
-			host.clusterName, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "cluster_name"), nil
-			}
-		case "partitioner":
-			host.partitioner, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "partitioner"), nil
-			}
-		case "broadcast_address":
-			ip, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "broadcast_address"), nil
-			}
-			host.broadcastAddress = net.ParseIP(ip)
-		case "preferred_ip":
-			ip, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "preferred_ip"), nil
-			}
-			host.preferredIP = net.ParseIP(ip)
-		case "rpc_address":
-			ip, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "rpc_address"), nil
-			}
-			host.rpcAddress = net.ParseIP(ip)
-		case "listen_address":
-			ip, ok := value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "listen_address"), nil
-			}
-			host.listenAddress = net.ParseIP(ip)
-		case "workload":
-			host.workload, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "workload"), nil
-			}
-		case "graph":
-			host.graph, ok = value.(bool)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "graph"), nil
-			}
-		case "tokens":
-			host.tokens, ok = value.([]string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "tokens"), nil
-			}
-		case "dse_version":
-			host.dseVersion, ok = value.(string)
-			if !ok {
-				return fmt.Errorf(assertErrorMsg, "dse_version"), nil
+// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
+func checkSystemSchema(control *controlConn) (bool, error) {
+	iter := control.query("SELECT * FROM system_schema.keyspaces")
+	if err := iter.err; err != nil {
+		if errf, ok := err.(*errorFrame); ok {
+			if errf.code == errReadFailure {
+				return false, nil
 			}
 		}
-		// TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
-		// Not sure what the port field will be called until the JIRA issue is complete
-	}
 
-	// Default to our connected port if the cluster doesn't have port information
-	if host.port == 0 {
-		host.port = r.session.cfg.Port
+		return false, err
 	}
 
-	return nil, &host
+	return true, nil
 }
 
-// Ask the control node for it's local host information
-func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) {
-	row := make(map[string]interface{})
-
-	// ask the connected node for local host info
-	it := r.session.control.query("SELECT * FROM system.local WHERE key='local'")
-	if it == nil {
-		return nil, errors.New("Attempted to query 'system.local' on a closed control connection")
-	}
-
-	// expect only 1 row
-	it.MapScan(row)
-	if err := it.Close(); err != nil {
-		return nil, err
-	}
-
-	// extract all available info about the host
-	err, host := r.hostInfoFromMap(row)
-	if err != nil {
-		return nil, err
-	}
+func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	// we need conn to be the same because we need to query system.peers and system.local
+	// on the same node to get the whole cluster
+
+	const (
+		legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
+		// only supported in 2.2.0, 2.1.6, 2.0.16
+		localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
+	)
+
+	localHost := &HostInfo{}
+	if r.localHasRpcAddr {
+		iter := r.session.control.query(localQuery)
+		if iter == nil {
+			return r.prevHosts, r.prevPartitioner, nil
+		}
 
-	return host, err
-}
+		iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
+			&localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
 
-// Given an ip address and port, return a peer that matched the ip address
-func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) {
-	row := make(map[string]interface{})
+		if err = iter.Close(); err != nil {
+			return nil, "", err
+		}
+	} else {
+		iter := r.session.control.withConn(func(c *Conn) *Iter {
+			localHost = c.host
+			return c.query(legacyLocalQuery)
+		})
 
-	it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip)
-	if it == nil {
-		return nil, errors.New("Attempted to query 'system.peers' on a closed control connection")
-	}
+		if iter == nil {
+			return r.prevHosts, r.prevPartitioner, nil
+		}
 
-	// expect only 1 row
-	it.MapScan(row)
-	if err := it.Close(); err != nil {
-		return nil, err
-	}
+		iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
 
-	// extract all available info about the host
-	err, host := r.hostInfoFromMap(row)
-	if err != nil {
-		return nil, err
+		if err = iter.Close(); err != nil {
+			return nil, "", err
+		}
 	}
 
-	return host, err
-}
+	localHost.port = r.session.cfg.Port
 
-// Ask the control node for host info on all it's known peers
-func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) {
-	var hosts []*HostInfo
+	hosts = []*HostInfo{localHost}
 
-	// Ask the node for a list of it's peers
-	it := r.session.control.query("SELECT * FROM system.peers")
-	if it == nil {
-		return nil, errors.New("Attempted to query 'system.peers' on a closed connection")
+	rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner()
+	if rows == nil {
+		return r.prevHosts, r.prevPartitioner, nil
 	}
 
-	for {
-		row := make(map[string]interface{})
-		if !it.MapScan(row) {
-			break
-		}
-		// extract all available info about the peer
-		err, host := r.hostInfoFromMap(row)
+	for rows.Next() {
+		host := &HostInfo{port: r.session.cfg.Port}
+		err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
 		if err != nil {
-			return nil, err
-		}
-
-		// If it's not a valid peer
-		if !r.IsValidPeer(host) {
-			Logger.Printf("Found invalid peer '%+v' "+
-				"Likely due to a gossip or snitch issue, this host will be ignored", host)
+			Logger.Println(err)
 			continue
 		}
-		hosts = append(hosts, host)
-	}
-	if it.err != nil {
-		return nil, errors.Wrap(it.err, "GetClusterPeerInfo()")
-	}
-	return hosts, nil
-}
-
-// Return true if the host is a valid peer
-func (r *ringDescriber) IsValidPeer(host *HostInfo) bool {
-	return !(len(host.RPCAddress()) == 0 ||
-		host.hostId == "" ||
-		host.dataCenter == "" ||
-		host.rack == "" ||
-		len(host.tokens) == 0)
-}
 
-// Return a list of hosts the cluster knows about
-func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-
-	// Update the localHost info with data from the connected host
-	localHost, err := r.GetLocalHostInfo()
-	if err != nil {
-		return r.prevHosts, r.prevPartitioner, err
-	}
-
-	// Update our list of hosts by querying the cluster
-	hosts, err := r.GetClusterPeerInfo()
-	if err != nil {
-		return r.prevHosts, r.prevPartitioner, err
-	}
-
-	hosts = append(hosts, localHost)
-
-	// Filter the hosts if filter is provided
-	var filteredHosts []*HostInfo
-	if r.session.cfg.HostFilter != nil {
-		for _, host := range hosts {
-			if r.session.cfg.HostFilter.Accept(host) {
-				filteredHosts = append(filteredHosts, host)
-			}
+		if r.matchFilter(host) {
+			hosts = append(hosts, host)
 		}
-	} else {
-		filteredHosts = hosts
 	}
 
-	r.prevHosts = filteredHosts
-	r.prevPartitioner = localHost.partitioner
-	r.localHost = localHost
-
-	return filteredHosts, localHost.partitioner, nil
-}
-
-// Given an ip/port return HostInfo for the specified ip/port
-func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) {
-	var host *HostInfo
-	var err error
-
-	// TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup?
-	// TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true?
-
-	// Ignore the port and connect address and use the address/port we already have
-	if r.session.control == nil || r.session.cfg.IgnorePeerAddr {
-		return &HostInfo{connectAddress: ip, port: port}, nil
+	if err = rows.Err(); err != nil {
+		return nil, "", err
 	}
 
-	// Attempt to get the host info for our control connection
-	controlHost := r.session.control.GetHostInfo()
-	if controlHost == nil {
-		return nil, errors.New("invalid control connection")
-	}
+	r.prevHosts = hosts
+	r.prevPartitioner = partitioner
 
-	// If we are asking about the same node our control connection has a connection too
-	if controlHost.ConnectAddress().Equal(ip) {
-		host, err = r.GetLocalHostInfo()
-	} else {
-		host, err = r.GetPeerHostInfo(ip, port)
-	}
+	return hosts, partitioner, nil
+}
 
-	// No host was found matching this ip/port
-	if err != nil {
-		return nil, err
+func (r *ringDescriber) matchFilter(host *HostInfo) bool {
+	if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
+		return false
 	}
 
-	// Apply host filter to the result
-	if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true {
-		return nil, err
+	if r.rackFilter != "" && r.rackFilter != host.Rack() {
+		return false
 	}
 
-	return host, err
+	return true
 }
 
 func (r *ringDescriber) refreshRing() error {
@@ -651,10 +386,12 @@ func (r *ringDescriber) refreshRing() error {
 	// TODO: move this to session
 	// TODO: handle removing hosts here
 	for _, h := range hosts {
-		if host, ok := r.session.ring.addHostIfMissing(h); !ok {
-			r.session.pool.addHost(h)
-		} else {
-			host.update(h)
+		if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) {
+			if host, ok := r.session.ring.addHostIfMissing(h); !ok {
+				r.session.pool.addHost(h)
+			} else {
+				host.update(h)
+			}
 		}
 	}
 

+ 1 - 63
host_source_test.go

@@ -1,12 +1,6 @@
-// +build all integration
-
 package gocql
 
-import (
-	"fmt"
-	"net"
-	"testing"
-)
+import "testing"
 
 func TestUnmarshalCassVersion(t *testing.T) {
 	tests := [...]struct {
@@ -48,59 +42,3 @@ func TestCassVersionBefore(t *testing.T) {
 	}
 
 }
-
-func TestIsValidPeer(t *testing.T) {
-	ring := ringDescriber{}
-	host := &HostInfo{
-		rpcAddress: net.ParseIP("0.0.0.0"),
-		rack:       "myRack",
-		hostId:     "0",
-		dataCenter: "datacenter",
-		tokens:     []string{"0", "1"},
-	}
-
-	if !ring.IsValidPeer(host) {
-		t.Errorf("expected %+v to be a valid peer", host)
-	}
-
-	host.rack = ""
-	if ring.IsValidPeer(host) {
-		t.Errorf("expected %+v to NOT be a valid peer", host)
-	}
-}
-
-func TestGetHosts(t *testing.T) {
-	cluster := createCluster()
-	session := createSessionFromCluster(cluster, t)
-
-	hosts, partitioner, err := session.hostSource.GetHosts()
-
-	assertTrue(t, "err == nil", err == nil)
-	assertTrue(t, "len(hosts) == 3", len(hosts) == 3)
-	assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
-
-}
-
-func TestGetHostsWithFilter(t *testing.T) {
-	filterHostIP := net.ParseIP("127.0.0.3")
-	cluster := createCluster()
-
-	// Filter to remove one of the localhost nodes
-	cluster.HostFilter = HostFilterFunc(func(host *HostInfo) bool {
-		if host.ConnectAddress().Equal(filterHostIP) {
-			return false
-		}
-		return true
-	})
-	session := createSessionFromCluster(cluster, t)
-
-	hosts, partitioner, err := session.hostSource.GetHosts()
-	assertTrue(t, "err == nil", err == nil)
-	assertTrue(t, "len(hosts) == 2", len(hosts) == 2)
-	assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
-	for _, host := range hosts {
-		if host.ConnectAddress().Equal(filterHostIP) {
-			t.Fatal(fmt.Sprintf("Did not expect to see '%q' in host list", filterHostIP))
-		}
-	}
-}

+ 8 - 8
policies.go

@@ -102,7 +102,7 @@ func (c *cowHostList) remove(ip net.IP) bool {
 	found := false
 	newL := make([]*HostInfo, 0, size)
 	for i := 0; i < len(l); i++ {
-		if !l[i].ConnectAddress().Equal(ip) {
+		if !l[i].Peer().Equal(ip) {
 			newL = append(newL, l[i])
 		} else {
 			found = true
@@ -236,7 +236,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) {
-	r.hosts.remove(host.ConnectAddress())
+	r.hosts.remove(host.Peer())
 }
 
 func (r *roundRobinHostPolicy) HostUp(host *HostInfo) {
@@ -279,7 +279,7 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
-	t.hosts.remove(host.ConnectAddress())
+	t.hosts.remove(host.Peer())
 	t.fallback.RemoveHost(host)
 
 	t.resetTokenRing()
@@ -393,7 +393,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
 	hostMap := make(map[string]*HostInfo, len(hosts))
 
 	for i, host := range hosts {
-		ip := host.ConnectAddress().String()
+		ip := host.Peer().String()
 		peers[i] = ip
 		hostMap[ip] = host
 	}
@@ -405,7 +405,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) {
 }
 
 func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 
 	r.mu.Lock()
 	defer r.mu.Unlock()
@@ -426,7 +426,7 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
 }
 
 func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 
 	r.mu.Lock()
 	defer r.mu.Unlock()
@@ -438,7 +438,7 @@ func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) {
 	delete(r.hostMap, ip)
 	hosts := make([]string, 0, len(r.hostMap))
 	for _, host := range r.hostMap {
-		hosts = append(hosts, host.ConnectAddress().String())
+		hosts = append(hosts, host.Peer().String())
 	}
 
 	r.hp.SetHosts(hosts)
@@ -492,7 +492,7 @@ func (host selectedHostPoolHost) Info() *HostInfo {
 }
 
 func (host selectedHostPoolHost) Mark(err error) {
-	ip := host.info.ConnectAddress().String()
+	ip := host.info.Peer().String()
 
 	host.policy.mu.RLock()
 	defer host.policy.mu.RUnlock()

+ 28 - 28
policies_test.go

@@ -17,8 +17,8 @@ func TestRoundRobinHostPolicy(t *testing.T) {
 	policy := RoundRobinHostPolicy()
 
 	hosts := [...]*HostInfo{
-		{hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)},
-		{hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)},
+		{hostId: "0", peer: net.IPv4(0, 0, 0, 1)},
+		{hostId: "1", peer: net.IPv4(0, 0, 0, 2)},
 	}
 
 	for _, host := range hosts {
@@ -68,10 +68,10 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 
 	// set the hosts
 	hosts := [...]*HostInfo{
-		{connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
-		{connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
-		{connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
-		{connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
+		{peer: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
+		{peer: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
+		{peer: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
+		{peer: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
 	}
 	for _, host := range hosts {
 		policy.AddHost(host)
@@ -79,13 +79,13 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 
 	// the token ring is not setup without the partitioner, but the fallback
 	// should work
-	if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
-		t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
+	if actual := policy.Pick(nil)(); !actual.Info().Peer().Equal(hosts[0].peer) {
+		t.Errorf("Expected peer 0 but was %s", actual.Info().Peer())
 	}
 
 	query.RoutingKey([]byte("30"))
-	if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
-		t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
+	if actual := policy.Pick(query)(); !actual.Info().Peer().Equal(hosts[1].peer) {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer())
 	}
 
 	policy.SetPartitioner("OrderedPartitioner")
@@ -93,18 +93,18 @@ func TestTokenAwareHostPolicy(t *testing.T) {
 	// now the token ring is configured
 	query.RoutingKey([]byte("20"))
 	iter = policy.Pick(query)
-	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
-		t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
+	if actual := iter(); !actual.Info().Peer().Equal(hosts[1].peer) {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().Peer())
 	}
 	// rest are round robin
-	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) {
-		t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress())
+	if actual := iter(); !actual.Info().Peer().Equal(hosts[2].peer) {
+		t.Errorf("Expected peer 2 but was %s", actual.Info().Peer())
 	}
-	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) {
-		t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress())
+	if actual := iter(); !actual.Info().Peer().Equal(hosts[3].peer) {
+		t.Errorf("Expected peer 3 but was %s", actual.Info().Peer())
 	}
-	if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
-		t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
+	if actual := iter(); !actual.Info().Peer().Equal(hosts[0].peer) {
+		t.Errorf("Expected peer 0 but was %s", actual.Info().Peer())
 	}
 }
 
@@ -113,8 +113,8 @@ func TestHostPoolHostPolicy(t *testing.T) {
 	policy := HostPoolHostPolicy(hostpool.New(nil))
 
 	hosts := []*HostInfo{
-		{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
-		{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
+		{hostId: "0", peer: net.IPv4(10, 0, 0, 0)},
+		{hostId: "1", peer: net.IPv4(10, 0, 0, 1)},
 	}
 
 	// Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
@@ -178,10 +178,10 @@ func TestTokenAwareNilHostInfo(t *testing.T) {
 	policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
 
 	hosts := [...]*HostInfo{
-		{connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
-		{connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
-		{connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
-		{connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
+		{peer: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
+		{peer: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
+		{peer: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
+		{peer: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
 	}
 	for _, host := range hosts {
 		policy.AddHost(host)
@@ -197,8 +197,8 @@ func TestTokenAwareNilHostInfo(t *testing.T) {
 		t.Fatal("got nil host")
 	} else if v := next.Info(); v == nil {
 		t.Fatal("got nil HostInfo")
-	} else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
-		t.Fatalf("expected peer 1 got %v", v.ConnectAddress())
+	} else if !v.Peer().Equal(hosts[1].peer) {
+		t.Fatalf("expected peer 1 got %v", v.Peer())
 	}
 
 	// Empty the hosts to trigger the panic when using the fallback.
@@ -221,7 +221,7 @@ func TestCOWList_Add(t *testing.T) {
 	toAdd := [...]net.IP{net.IPv4(0, 0, 0, 0), net.IPv4(1, 0, 0, 0), net.IPv4(2, 0, 0, 0)}
 
 	for _, addr := range toAdd {
-		if !cow.add(&HostInfo{connectAddress: addr}) {
+		if !cow.add(&HostInfo{peer: addr}) {
 			t.Fatal("did not add peer which was not in the set")
 		}
 	}
@@ -233,7 +233,7 @@ func TestCOWList_Add(t *testing.T) {
 
 	set := make(map[string]bool)
 	for _, host := range hosts {
-		set[string(host.ConnectAddress())] = true
+		set[string(host.Peer())] = true
 	}
 
 	for _, addr := range toAdd {

+ 3 - 3
ring.go

@@ -53,7 +53,7 @@ func (r *ring) allHosts() []*HostInfo {
 }
 
 func (r *ring) addHost(host *HostInfo) bool {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 
 	r.mu.Lock()
 	if r.hosts == nil {
@@ -79,7 +79,7 @@ func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
 }
 
 func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
-	ip := host.ConnectAddress().String()
+	ip := host.Peer().String()
 
 	r.mu.Lock()
 	if r.hosts == nil {
@@ -106,7 +106,7 @@ func (r *ring) removeHost(ip net.IP) bool {
 	_, ok := r.hosts[k]
 	if ok {
 		for i, host := range r.hostList {
-			if host.ConnectAddress().Equal(ip) {
+			if host.Peer().Equal(ip) {
 				r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
 				break
 			}

+ 3 - 3
ring_test.go

@@ -8,7 +8,7 @@ import (
 func TestRing_AddHostIfMissing_Missing(t *testing.T) {
 	ring := &ring{}
 
-	host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
+	host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
 	h1, ok := ring.addHostIfMissing(host)
 	if ok {
 		t.Fatal("host was reported as already existing")
@@ -22,10 +22,10 @@ func TestRing_AddHostIfMissing_Missing(t *testing.T) {
 func TestRing_AddHostIfMissing_Existing(t *testing.T) {
 	ring := &ring{}
 
-	host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
+	host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
 	ring.addHostIfMissing(host)
 
-	h2 := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)}
+	h2 := &HostInfo{peer: net.IPv4(1, 1, 1, 1)}
 
 	h1, ok := ring.addHostIfMissing(h2)
 	if !ok {

+ 20 - 8
session.go

@@ -111,7 +111,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)
 
 	s.hostSource = &ringDescriber{
-		session: s,
+		session:   s,
+		closeChan: make(chan bool),
 	}
 
 	if cfg.PoolConfig.HostSelectionPolicy == nil {
@@ -173,19 +174,26 @@ func (s *Session) init() error {
 			return err
 		}
 
+		// need to setup host source to check for broadcast_address in system.local
+		localHasRPCAddr, _ := checkSystemLocal(s.control)
+		s.hostSource.localHasRpcAddr = localHasRPCAddr
+
 		if !s.cfg.DisableInitialHostLookup {
-			var partitioner string
-			hosts, partitioner, err = s.hostSource.GetHosts()
+			// TODO(zariel): we need to get the partitioner from here
+			var p string
+			hosts, p, err = s.hostSource.GetHosts()
 			if err != nil {
 				return err
 			}
-			s.policy.SetPartitioner(partitioner)
+			s.policy.SetPartitioner(p)
 		}
 	}
 
 	for _, host := range hosts {
-		host = s.ring.addOrUpdate(host)
-		s.handleNodeUp(host.ConnectAddress(), host.Port(), false)
+		if s.cfg.HostFilter == nil || s.cfg.HostFilter.Accept(host) {
+			host = s.ring.addOrUpdate(host)
+			s.handleNodeUp(host.Peer(), host.Port(), false)
+		}
 	}
 
 	// TODO(zariel): we probably dont need this any more as we verify that we
@@ -226,7 +234,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
 			if gocqlDebug {
 				buf := bytes.NewBufferString("Session.ring:")
 				for _, h := range hosts {
-					buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]")
+					buf.WriteString("[" + h.Peer().String() + ":" + h.State().String() + "]")
 				}
 				Logger.Println(buf.String())
 			}
@@ -235,7 +243,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) {
 				if h.IsUp() {
 					continue
 				}
-				s.handleNodeUp(h.ConnectAddress(), h.Port(), true)
+				s.handleNodeUp(h.Peer(), h.Port(), true)
 			}
 		case <-s.quit:
 			return
@@ -336,6 +344,10 @@ func (s *Session) Close() {
 		s.pool.Close()
 	}
 
+	if s.hostSource != nil {
+		close(s.hostSource.closeChan)
+	}
+
 	if s.control != nil {
 		s.control.close()
 	}

+ 5 - 6
session_connect_test.go

@@ -1,13 +1,12 @@
 package gocql
 
 import (
+	"golang.org/x/net/context"
 	"net"
 	"strconv"
 	"sync"
 	"testing"
 	"time"
-
-	"golang.org/x/net/context"
 )
 
 type OneConnTestServer struct {
@@ -104,8 +103,8 @@ func TestSession_connect_WithNoTranslator(t *testing.T) {
 	go srvr.Serve()
 
 	Connect(&HostInfo{
-		connectAddress: srvr.Addr,
-		port:           srvr.Port,
+		peer: srvr.Addr,
+		port: srvr.Port,
 	}, session.connCfg, testConnErrorHandler(t), session)
 
 	assertConnectionEventually(t, 500*time.Millisecond, srvr)
@@ -124,8 +123,8 @@ func TestSession_connect_WithTranslator(t *testing.T) {
 
 	// the provided address will be translated
 	Connect(&HostInfo{
-		connectAddress: net.ParseIP("10.10.10.10"),
-		port:           5432,
+		peer: net.ParseIP("10.10.10.10"),
+		port: 5432,
 	}, session.connCfg, testConnErrorHandler(t), session)
 
 	assertConnectionEventually(t, 500*time.Millisecond, srvr)

+ 1 - 1
token.go

@@ -184,7 +184,7 @@ func (t *tokenRing) String() string {
 		buf.WriteString("]")
 		buf.WriteString(t.tokens[i].String())
 		buf.WriteString(":")
-		buf.WriteString(t.hosts[i].ConnectAddress().String())
+		buf.WriteString(t.hosts[i].Peer().String())
 	}
 	buf.WriteString("\n}")
 	return string(buf.Bytes())

+ 18 - 21
token_test.go

@@ -232,8 +232,8 @@ func hostsForTests(n int) []*HostInfo {
 	hosts := make([]*HostInfo, n)
 	for i := 0; i < n; i++ {
 		host := &HostInfo{
-			connectAddress: net.IPv4(1, 1, 1, byte(n)),
-			tokens:         []string{fmt.Sprintf("%d", n)},
+			peer:   net.IPv4(1, 1, 1, byte(n)),
+			tokens: []string{fmt.Sprintf("%d", n)},
 		}
 
 		hosts[i] = host
@@ -254,20 +254,19 @@ func TestMurmur3TokenRing(t *testing.T) {
 
 	for _, host := range hosts {
 		actual := ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
-			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
-				host.tokens[0], actual.ConnectAddress())
+		if !actual.Peer().Equal(host.peer) {
+			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
 		}
 	}
 
 	actual := ring.GetHostForToken(p.ParseString("12"))
-	if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
-		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
+	if !actual.Peer().Equal(hosts[1].peer) {
+		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) {
-		t.Errorf("Expected address 0 for token \"24324545443332\", but was %s", actual.ConnectAddress())
+	if !actual.Peer().Equal(hosts[0].peer) {
+		t.Errorf("Expected peer 0 for token \"24324545443332\", but was %s", actual.Peer())
 	}
 }
 
@@ -286,20 +285,19 @@ func TestOrderedTokenRing(t *testing.T) {
 	var actual *HostInfo
 	for _, host := range hosts {
 		actual = ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
-			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
-				host.tokens[0], actual.ConnectAddress())
+		if !actual.Peer().Equal(host.peer) {
+			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
 		}
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("12"))
 	if !actual.peer.Equal(hosts[1].peer) {
-		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
+		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
-		t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress())
+	if !actual.peer.Equal(hosts[1].peer) {
+		t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer())
 	}
 }
 
@@ -317,19 +315,18 @@ func TestRandomTokenRing(t *testing.T) {
 	var actual *HostInfo
 	for _, host := range hosts {
 		actual = ring.GetHostForToken(p.ParseString(host.tokens[0]))
-		if !actual.ConnectAddress().Equal(host.ConnectAddress()) {
-			t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(),
-				host.tokens[0], actual.ConnectAddress())
+		if !actual.Peer().Equal(host.peer) {
+			t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer)
 		}
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("12"))
 	if !actual.peer.Equal(hosts[1].peer) {
-		t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress())
+		t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer())
 	}
 
 	actual = ring.GetHostForToken(p.ParseString("24324545443332"))
-	if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) {
-		t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress())
+	if !actual.peer.Equal(hosts[0].peer) {
+		t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer())
 	}
 }