浏览代码

events: add to flag to conditionally wait for startup

Only wait for nodes if they came from an event.
Chris Bannister 10 年之前
父节点
当前提交
f111b798fc
共有 2 个文件被更改,包括 8 次插入9 次删除
  1. 7 8
      events.go
  2. 1 1
      session.go

+ 7 - 8
events.go

@@ -114,14 +114,14 @@ func (s *Session) handleNodeEvent(frames []frame) {
 		log.Printf("debouncing event %+v\n", f)
 		switch f.change {
 		case "NEW_NODE":
-			s.handleNewNode(f.host, f.port)
+			s.handleNewNode(f.host, f.port, true)
 		case "REMOVED_NODE":
 			s.handleRemovedNode(f.host, f.port)
 		case "MOVED_NODE":
 		// java-driver handles this, not mentioned in the spec
 		// TODO(zariel): refresh token map
 		case "UP":
-			s.handleNodeUp(f.host, f.port)
+			s.handleNodeUp(f.host, f.port, true)
 		case "DOWN":
 			s.handleNodeDown(f.host, f.port)
 		}
@@ -153,7 +153,7 @@ func (s *Session) handleEvent(framer *framer) {
 
 }
 
-func (s *Session) handleNewNode(host net.IP, port int) {
+func (s *Session) handleNewNode(host net.IP, port int, waitForBinary bool) {
 	// TODO(zariel): need to be able to filter discovered nodes
 
 	var hostInfo *HostInfo
@@ -169,7 +169,7 @@ func (s *Session) handleNewNode(host net.IP, port int) {
 		hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp}
 	}
 
-	if t := hostInfo.Version().nodeUpDelay(); t > 0 {
+	if t := hostInfo.Version().nodeUpDelay(); t > 0 && waitForBinary {
 		time.Sleep(t)
 	}
 
@@ -195,11 +195,11 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
 	s.hostSource.refreshRing()
 }
 
-func (s *Session) handleNodeUp(ip net.IP, port int) {
+func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
 	addr := ip.String()
 	host := s.ring.getHost(addr)
 	if host != nil {
-		if t := host.Version().nodeUpDelay(); t > 0 {
+		if t := host.Version().nodeUpDelay(); t > 0 && waitForBinary {
 			time.Sleep(t)
 		}
 
@@ -208,8 +208,7 @@ func (s *Session) handleNodeUp(ip net.IP, port int) {
 		return
 	}
 
-	// TODO: this could infinite loop
-	s.handleNewNode(ip, port)
+	s.handleNewNode(ip, port, waitForBinary)
 }
 
 func (s *Session) handleNodeDown(ip net.IP, port int) {

+ 1 - 1
session.go

@@ -139,7 +139,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	}
 
 	for _, host := range hosts {
-		s.handleNodeUp(net.ParseIP(host.Peer()), host.Port())
+		s.handleNodeUp(net.ParseIP(host.Peer()), host.Port(), false)
 	}
 
 	// TODO(zariel): we probably dont need this any more as we verify that we