浏览代码

Merge pull request #588 from Zariel/ignore-cassandra-peer-addr

Add option to ignore cassandras reported address
Chris Bannister 10 年之前
父节点
当前提交
4fdf2f5d6a
共有 3 个文件被更改,包括 57 次插入23 次删除
  1. 16 0
      cluster.go
  2. 9 0
      events.go
  3. 32 23
      session.go

+ 16 - 0
cluster.go

@@ -126,6 +126,22 @@ type ClusterConfig struct {
 	// via Discovery
 	HostFilter HostFilter
 
+	// If IgnorePeerAddr is true and the address in system.peers does not match
+	// the supplied host by either initial hosts or discovered via events then the
+	// host will be replaced with the supplied address.
+	//
+	// For example if an event comes in with host=10.0.0.1 but when looking up that
+	// address in system.local or system.peers returns 127.0.0.1, the peer will be
+	// set to 10.0.0.1 which is what will be used to connect to.
+	IgnorePeerAddr bool
+
+	// If DisableInitialHostLookup then the driver will not attempt to get host info
+	// from the system.peers table, this will mean that the driver will connect to
+	// hosts supplied and will not attempt to lookup the hosts information, this will
+	// mean that data_centre, rack and token information will not be available and as
+	// such host filtering and token aware query routing will not be available.
+	DisableInitialHostLookup bool
+
 	// internal config for testing
 	disableControlConn bool
 }

+ 9 - 0
events.go

@@ -167,6 +167,11 @@ func (s *Session) handleNewNode(host net.IP, port int, waitForBinary bool) {
 		hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp}
 	}
 
+	addr := host.String()
+	if s.cfg.IgnorePeerAddr && hostInfo.Peer() != addr {
+		hostInfo.setPeer(addr)
+	}
+
 	if s.cfg.HostFilter != nil {
 		if !s.cfg.HostFilter.Accept(hostInfo) {
 			return
@@ -216,6 +221,10 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
 	addr := ip.String()
 	host := s.ring.getHost(addr)
 	if host != nil {
+		if s.cfg.IgnorePeerAddr && host.Peer() != addr {
+			host.setPeer(addr)
+		}
+
 		if s.cfg.HostFilter != nil {
 			if !s.cfg.HostFilter.Accept(host) {
 				return

+ 32 - 23
session.go

@@ -58,6 +58,26 @@ type Session struct {
 	isClosed bool
 }
 
+func addrsToHosts(addrs []string, defaultPort int) ([]*HostInfo, error) {
+	hosts := make([]*HostInfo, len(addrs))
+	for i, hostport := range addrs {
+		// TODO: remove duplication
+		addr, portStr, err := net.SplitHostPort(JoinHostPort(hostport, defaultPort))
+		if err != nil {
+			return nil, fmt.Errorf("NewSession: unable to parse hostport of addr %q: %v", hostport, err)
+		}
+
+		port, err := strconv.Atoi(portStr)
+		if err != nil {
+			return nil, fmt.Errorf("NewSession: invalid port for hostport of addr %q: %v", hostport, err)
+		}
+
+		hosts[i] = &HostInfo{peer: addr, port: port, state: NodeUp}
+	}
+
+	return hosts, nil
+}
+
 // NewSession wraps an existing Node.
 func NewSession(cfg ClusterConfig) (*Session, error) {
 	//Check that hosts in the ClusterConfig is not empty
@@ -109,38 +129,27 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		// need to setup host source to check for broadcast_address in system.local
 		localHasRPCAddr, _ := checkSystemLocal(s.control)
 		s.hostSource.localHasRpcAddr = localHasRPCAddr
-		hosts, _, err = s.hostSource.GetHosts()
+
+		var err error
+		if cfg.DisableInitialHostLookup {
+			// TODO: we could look at system.local to get token and other metadata
+			// in this case.
+			hosts, err = addrsToHosts(cfg.Hosts, cfg.Port)
+		} else {
+			hosts, _, err = s.hostSource.GetHosts()
+		}
+
 		if err != nil {
 			s.Close()
 			return nil, err
 		}
-
-		for _, host := range hosts {
-			s.ring.addHost(host)
-		}
-
 	} else {
 		// we dont get host info
-		hosts = make([]*HostInfo, len(cfg.Hosts))
-		for i, hostport := range cfg.Hosts {
-			// TODO: remove duplication
-			addr, portStr, err := net.SplitHostPort(JoinHostPort(hostport, cfg.Port))
-			if err != nil {
-				s.Close()
-				return nil, fmt.Errorf("NewSession: unable to parse hostport of addr %q: %v", hostport, err)
-			}
-
-			port, err := strconv.Atoi(portStr)
-			if err != nil {
-				s.Close()
-				return nil, fmt.Errorf("NewSession: invalid port for hostport of addr %q: %v", hostport, err)
-			}
-
-			hosts[i] = &HostInfo{peer: addr, port: port, state: NodeUp}
-		}
+		hosts, err = addrsToHosts(cfg.Hosts, cfg.Port)
 	}
 
 	for _, host := range hosts {
+		s.ring.addHost(host)
 		s.handleNodeUp(net.ParseIP(host.Peer()), host.Port(), false)
 	}