Browse Source

Merge pull request #675 from unihorn/56

fix(peer_server): exit all server goroutines in Stop()
Yicheng Qin 11 years ago
parent
commit
d78116c35b
2 changed files with 60 additions and 27 deletions
  1. 10 7
      discovery/discovery.go
  2. 50 20
      server/peer_server.go

+ 10 - 7
discovery/discovery.go

@@ -33,7 +33,7 @@ func init() {
 	defaultDiscoverer = &Discoverer{}
 }
 
-func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []string, err error) {
+func (d *Discoverer) Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) (peers []string, err error) {
 	d.name = name
 	d.peer = peer
 	d.discoveryURL = discoveryURL
@@ -68,7 +68,7 @@ func (d *Discoverer) Do(discoveryURL string, name string, peer string) (peers []
 
 	// Start the very slow heartbeat to the cluster now in anticipation
 	// that everything is going to go alright now
-	go d.startHeartbeat()
+	startRoutine(func() { d.startHeartbeat(closeChan) })
 
 	// Attempt to take the leadership role, if there is no error we are it!
 	resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0)
@@ -120,17 +120,20 @@ func (d *Discoverer) findPeers() (peers []string, err error) {
 	return
 }
 
-func (d *Discoverer) startHeartbeat() {
+func (d *Discoverer) startHeartbeat(closeChan <-chan bool) {
 	// In case of errors we should attempt to heartbeat fairly frequently
 	heartbeatInterval := defaultTTL / 8
-	ticker := time.Tick(time.Second * time.Duration(heartbeatInterval))
+	ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval))
+	defer ticker.Stop()
 	for {
 		select {
-		case <-ticker:
+		case <-ticker.C:
 			err := d.heartbeat()
 			if err != nil {
 				log.Warnf("Discovery heartbeat failed: %v", err)
 			}
+		case <-closeChan:
+			return
 		}
 	}
 }
@@ -140,6 +143,6 @@ func (d *Discoverer) heartbeat() error {
 	return err
 }
 
-func Do(discoveryURL string, name string, peer string) ([]string, error) {
-	return defaultDiscoverer.Do(discoveryURL, name, peer)
+func Do(discoveryURL string, name string, peer string, closeChan <-chan bool, startRoutine func(func())) ([]string, error) {
+	return defaultDiscoverer.Do(discoveryURL, name, peer, closeChan, startRoutine)
 }

+ 50 - 20
server/peer_server.go

@@ -68,6 +68,7 @@ type PeerServer struct {
 	mode           Mode
 
 	closeChan            chan bool
+	routineGroup         sync.WaitGroup
 	timeoutThresholdChan chan interface{}
 
 	standbyPeerURL   string
@@ -293,14 +294,14 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
 
 	s.closeChan = make(chan bool)
 
-	go s.monitorSync()
-	go s.monitorTimeoutThreshold(s.closeChan)
-	go s.monitorActiveSize(s.closeChan)
-	go s.monitorPeerActivity(s.closeChan)
+	s.startRoutine(s.monitorSync)
+	s.startRoutine(s.monitorTimeoutThreshold)
+	s.startRoutine(s.monitorActiveSize)
+	s.startRoutine(s.monitorPeerActivity)
 
 	// open the snapshot
 	if snapshot {
-		go s.monitorSnapshot()
+		s.startRoutine(s.monitorSnapshot)
 	}
 
 	return nil
@@ -312,9 +313,10 @@ func (s *PeerServer) Stop() {
 
 	if s.closeChan != nil {
 		close(s.closeChan)
-		s.closeChan = nil
 	}
 	s.raftServer.Stop()
+	s.routineGroup.Wait()
+	s.closeChan = nil
 }
 
 func (s *PeerServer) HTTPHandler() http.Handler {
@@ -454,7 +456,7 @@ func (s *PeerServer) checkPeerAddressNonconflict() bool {
 
 // Helper function to do discovery and return results in expected format
 func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
-	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
+	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine)
 
 	// Warn about errors coming from discovery, this isn't fatal
 	// since the user might have provided a peer list elsewhere,
@@ -698,9 +700,24 @@ func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) {
 	}
 }
 
+func (s *PeerServer) startRoutine(f func()) {
+	s.routineGroup.Add(1)
+	go func() {
+		defer s.routineGroup.Done()
+		f()
+	}()
+}
+
 func (s *PeerServer) monitorSnapshot() {
 	for {
-		time.Sleep(s.snapConf.checkingInterval)
+		timer := time.NewTimer(s.snapConf.checkingInterval)
+		defer timer.Stop()
+		select {
+		case <-s.closeChan:
+			return
+		case <-timer.C:
+		}
+
 		currentIndex := s.RaftServer().CommitIndex()
 		count := currentIndex - s.snapConf.lastIndex
 		if uint64(count) > s.snapConf.snapshotThr {
@@ -712,10 +729,13 @@ func (s *PeerServer) monitorSnapshot() {
 }
 
 func (s *PeerServer) monitorSync() {
-	ticker := time.Tick(time.Millisecond * 500)
+	ticker := time.NewTicker(time.Millisecond * 500)
+	defer ticker.Stop()
 	for {
 		select {
-		case now := <-ticker:
+		case <-s.closeChan:
+			return
+		case now := <-ticker.C:
 			if s.raftServer.State() == raft.Leader {
 				s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
 			}
@@ -725,27 +745,35 @@ func (s *PeerServer) monitorSync() {
 
 // monitorTimeoutThreshold groups timeout threshold events together and prints
 // them as a single log line.
-func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
+func (s *PeerServer) monitorTimeoutThreshold() {
 	for {
 		select {
+		case <-s.closeChan:
+			return
 		case value := <-s.timeoutThresholdChan:
 			log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
-		case <-closeChan:
-			return
 		}
 
-		time.Sleep(ThresholdMonitorTimeout)
+		timer := time.NewTimer(ThresholdMonitorTimeout)
+		defer timer.Stop()
+		select {
+		case <-s.closeChan:
+			return
+		case <-timer.C:
+		}
 	}
 }
 
 // monitorActiveSize has the leader periodically check the status of cluster
 // nodes and swaps them out for standbys as needed.
-func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
+func (s *PeerServer) monitorActiveSize() {
 	for {
+		timer := time.NewTimer(ActiveMonitorTimeout)
+		defer timer.Stop()
 		select {
-		case <-time.After(ActiveMonitorTimeout):
-		case <-closeChan:
+		case <-s.closeChan:
 			return
+		case <-timer.C:
 		}
 
 		// Ignore while this peer is not a leader.
@@ -802,12 +830,14 @@ func (s *PeerServer) monitorActiveSize(closeChan chan bool) {
 }
 
 // monitorPeerActivity has the leader periodically for dead nodes and demotes them.
-func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
+func (s *PeerServer) monitorPeerActivity() {
 	for {
+		timer := time.NewTimer(PeerActivityMonitorTimeout)
+		defer timer.Stop()
 		select {
-		case <-time.After(PeerActivityMonitorTimeout):
-		case <-closeChan:
+		case <-s.closeChan:
 			return
+		case <-timer.C:
 		}
 
 		// Ignore while this peer is not a leader.