Przeglądaj źródła

Move reconnection goroutine into a method

matope 9 lat temu
rodzic
commit
2ea86d3ae9
3 zmienionych plików z 26 dodań i 24 usunięć
  1. 1 1
      cassandra_test.go
  2. 0 1
      events_ccm_test.go
  3. 25 22
      session.go

+ 1 - 1
cassandra_test.go

@@ -566,7 +566,7 @@ func TestReconnection(t *testing.T) {
 	defer session.Close()
 
 	h := session.ring.allHosts()[0]
-	session.handleNodeDown(net.ParseIP(h.peer), h.port)
+	session.handleNodeDown(net.ParseIP(h.Peer()), h.Port())
 
 	if h.State() != NodeDown {
 		t.Fatal("Host should be NodeDown but not.")

+ 0 - 1
events_ccm_test.go

@@ -51,7 +51,6 @@ func TestEventNodeDownControl(t *testing.T) {
 	}
 
 	cluster := createCluster()
-	cluster.ReconnectInterval = 0
 	cluster.Hosts = []string{status[targetNode].Addr}
 	session := createSessionFromCluster(cluster, t)
 	defer session.Close()

+ 25 - 22
session.go

@@ -178,28 +178,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	}
 
 	if cfg.ReconnectInterval > 0 {
-		go func() {
-			for !s.Closed() {
-				time.Sleep(cfg.ReconnectInterval)
-				hosts := s.ring.allHosts()
-
-				// Print session.ring for debug.
-				if gocqlDebug {
-					buf := bytes.NewBufferString("Session.ring:")
-					for _, h := range hosts {
-						buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
-					}
-					log.Println(buf.String())
-				}
-
-				for _, h := range hosts {
-					if h.IsUp() {
-						continue
-					}
-					s.handleNodeUp(net.ParseIP(h.peer), h.port, true)
-				}
-			}
-		}()
+		go s.reconnectDownedHosts(cfg.ReconnectInterval)
 	}
 
 	// TODO(zariel): we probably dont need this any more as we verify that we
@@ -215,6 +194,30 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	return s, nil
 }
 
+func (s *Session) reconnectDownedHosts(intv time.Duration) {
+	for !s.Closed() {
+		time.Sleep(intv)
+
+		hosts := s.ring.allHosts()
+
+		// Print session.ring for debug.
+		if gocqlDebug {
+			buf := bytes.NewBufferString("Session.ring:")
+			for _, h := range hosts {
+				buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
+			}
+			log.Println(buf.String())
+		}
+
+		for _, h := range hosts {
+			if h.IsUp() {
+				continue
+			}
+			s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
+		}
+	}
+}
+
 // SetConsistency sets the default consistency level for this session. This
 // setting can also be changed on a per-query basis and the default value
 // is Quorum.