Преглед на файлове

Merge pull request #408 from zachbadgett/ring-describer-cleanup

Added closeChan to clean up ring describer when session is closed
Chris Bannister преди 10 години
родител
ревизия
179b5948e3
променени са 3 файла, в които са добавени 26 реда и са изтрити 15 реда
  1. 1 0
      AUTHORS
  2. 17 13
      host_source.go
  3. 8 2
      session.go

+ 1 - 0
AUTHORS

@@ -45,3 +45,4 @@ Dan Kinder <dkinder.is.me@gmail.com>
 Oliver Beattie <oliver@obeattie.com>
 Justin Corpron <justin@retailnext.com>
 Miles Delahunty <miles.delahunty@gmail.com>
+Zach Badgett <zach.badgett@gmail.com>

+ 17 - 13
host_source.go

@@ -21,6 +21,7 @@ type ringDescriber struct {
 	prevHosts       []HostInfo
 	prevPartitioner string
 	session         *Session
+	closeChan       chan bool
 }
 
 func (r *ringDescriber) GetHosts() (
@@ -96,20 +97,23 @@ func (h *ringDescriber) run(sleep time.Duration) {
 	}
 
 	for {
-		// if we have 0 hosts this will return the previous list of hosts to
-		// attempt to reconnect to the cluster otherwise we would never find
-		// downed hosts again, could possibly have an optimisation to only
-		// try to add new hosts if GetHosts didnt error and the hosts didnt change.
-		hosts, partitioner, err := h.GetHosts()
-		if err != nil {
-			log.Println("RingDescriber: unable to get ring topology:", err)
-		} else {
-			h.session.Pool.SetHosts(hosts)
-			if v, ok := h.session.Pool.(SetPartitioner); ok {
-				v.SetPartitioner(partitioner)
+		select {
+		case <-time.After(sleep):
+			// if we have 0 hosts this will return the previous list of hosts to
+			// attempt to reconnect to the cluster otherwise we would never find
+			// downed hosts again, could possibly have an optimisation to only
+			// try to add new hosts if GetHosts didnt error and the hosts didnt change.
+			hosts, partitioner, err := h.GetHosts()
+			if err != nil {
+				log.Println("RingDescriber: unable to get ring topology:", err)
+			} else {
+				h.session.Pool.SetHosts(hosts)
+				if v, ok := h.session.Pool.(SetPartitioner); ok {
+					v.SetPartitioner(partitioner)
+				}
 			}
+		case <-h.closeChan:
+			return
 		}
-
-		time.Sleep(sleep)
 	}
 }

+ 8 - 2
session.go

@@ -35,6 +35,7 @@ type Session struct {
 	routingKeyInfoCache routingKeyInfoLRU
 	schemaDescriber     *schemaDescriber
 	trace               Tracer
+	hostSource          *ringDescriber
 	mu                  sync.RWMutex
 
 	cfg ClusterConfig
@@ -84,13 +85,14 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		s.SetPageSize(cfg.PageSize)
 
 		if cfg.DiscoverHosts {
-			hostSource := &ringDescriber{
+			s.hostSource = &ringDescriber{
 				session:    s,
 				dcFilter:   cfg.Discovery.DcFilter,
 				rackFilter: cfg.Discovery.RackFilter,
+				closeChan:  make(chan bool),
 			}
 
-			go hostSource.run(cfg.Discovery.Sleep)
+			go s.hostSource.run(cfg.Discovery.Sleep)
 		}
 
 		return s, nil
@@ -184,6 +186,10 @@ func (s *Session) Close() {
 	s.isClosed = true
 
 	s.Pool.Close()
+
+	if s.hostSource != nil {
+		close(s.hostSource.closeChan)
+	}
 }
 
 func (s *Session) Closed() bool {