浏览代码

Eliminate temporarily leaking goroutine in reconnectDownedHosts

Andrew de Andrade 9 年之前
父节点
当前提交
638e60134a
共有 1 个文件被更改,包括 29 次插入16 次删除
  1. 29 16
      session.go

+ 29 - 16
session.go

@@ -66,6 +66,9 @@ type Session struct {
 
 	cfg ClusterConfig
 
+	reconnectTicker *time.Ticker
+	quit            chan struct{}
+
 	closeMu  sync.RWMutex
 	isClosed bool
 }
@@ -210,25 +213,31 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 }
 
 func (s *Session) reconnectDownedHosts(intv time.Duration) {
-	for !s.Closed() {
-		time.Sleep(intv)
-
-		hosts := s.ring.allHosts()
-
-		// Print session.ring for debug.
-		if gocqlDebug {
-			buf := bytes.NewBufferString("Session.ring:")
-			for _, h := range hosts {
-				buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
+	s.quit = make(chan struct{})
+	s.reconnectTicker = time.NewTicker(intv)
+	for {
+		select {
+		case <-s.reconnectTicker.C:
+			hosts := s.ring.allHosts()
+
+			// Print session.ring for debug.
+			if gocqlDebug {
+				buf := bytes.NewBufferString("Session.ring:")
+				for _, h := range hosts {
+					buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
+				}
+				log.Println(buf.String())
 			}
-			log.Println(buf.String())
-		}
 
-		for _, h := range hosts {
-			if h.IsUp() {
-				continue
+			for _, h := range hosts {
+				if h.IsUp() {
+					continue
+				}
+				s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
 			}
-			s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
+		case <-s.quit:
+			s.reconnectTicker.Stop()
+			return
 		}
 	}
 }
@@ -341,6 +350,10 @@ func (s *Session) Close() {
 	if s.schemaEvents != nil {
 		s.schemaEvents.stop()
 	}
+
+	if s.quit != nil {
+		close(s.quit)
+	}
 }
 
 func (s *Session) Closed() bool {