Browse Source

Merge pull request #776 from malandrew/reconnect_hosts_quit_channel_and_ticker

Eliminate temporarily leaking goroutine in reconnectDownedHosts
Chris Bannister 9 years ago
parent
commit
7813700e41
2 changed files with 31 additions and 16 deletions
  1. 1 0
      AUTHORS
  2. 30 16
      session.go

+ 1 - 0
AUTHORS

@@ -77,3 +77,4 @@ Pekka Enberg <penberg@scylladb.com>
 Mark M <m.mim95@gmail.com>
 Bartosz Burclaf <burclaf@gmail.com>
 Marcus King <marcusking01@gmail.com>
+Andrew de Andrade <andrew@deandrade.com.br>

+ 30 - 16
session.go

@@ -66,6 +66,8 @@ type Session struct {
 
 	cfg ClusterConfig
 
+	quit chan struct{}
+
 	closeMu  sync.RWMutex
 	isClosed bool
 }
@@ -183,6 +185,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		}
 	}
 
+	s.quit = make(chan struct{})
+
 	if cfg.ReconnectInterval > 0 {
 		go s.reconnectDownedHosts(cfg.ReconnectInterval)
 	}
@@ -210,25 +214,31 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 }
 
 func (s *Session) reconnectDownedHosts(intv time.Duration) {
-	for !s.Closed() {
-		time.Sleep(intv)
-
-		hosts := s.ring.allHosts()
-
-		// Print session.ring for debug.
-		if gocqlDebug {
-			buf := bytes.NewBufferString("Session.ring:")
-			for _, h := range hosts {
-				buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
+	reconnectTicker := time.NewTicker(intv)
+	defer reconnectTicker.Stop()
+
+	for {
+		select {
+		case <-reconnectTicker.C:
+			hosts := s.ring.allHosts()
+
+			// Print session.ring for debug.
+			if gocqlDebug {
+				buf := bytes.NewBufferString("Session.ring:")
+				for _, h := range hosts {
+					buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
+				}
+				log.Println(buf.String())
 			}
-			log.Println(buf.String())
-		}
 
-		for _, h := range hosts {
-			if h.IsUp() {
-				continue
+			for _, h := range hosts {
+				if h.IsUp() {
+					continue
+				}
+				s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
 			}
-			s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
+		case <-s.quit:
+			return
 		}
 	}
 }
@@ -341,6 +351,10 @@ func (s *Session) Close() {
 	if s.schemaEvents != nil {
 		s.schemaEvents.stop()
 	}
+
+	if s.quit != nil {
+		close(s.quit)
+	}
 }
 
 func (s *Session) Closed() bool {