Browse Source

Add heartbeat and timeout threshold loggers.

Ben Johnson 12 years ago
parent
commit
88e0263d08
1 changed files with 41 additions and 0 deletions
  1. 41 0
      server/peer_server.go

+ 41 - 0
server/peer_server.go

@@ -42,6 +42,9 @@ type PeerServer struct {
 	RetryTimes       int
 	HeartbeatTimeout time.Duration
 	ElectionTimeout  time.Duration
+
+	closeChan chan bool
+	timeoutThresholdChan chan interface{}
 }
 
 // TODO: find a good policy to do snapshot
@@ -82,6 +85,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 		},
 		HeartbeatTimeout: defaultHeartbeatTimeout,
 		ElectionTimeout:  defaultElectionTimeout,
+
+		timeoutThresholdChan: make(chan interface{}, 1),
 	}
 
 	// Create transporter for raft
@@ -99,6 +104,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 	s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
 	s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
 	s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
+	s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger)
+	s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
 
 	return s
 }
@@ -147,7 +154,10 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		log.Debugf("%s restart as a follower", s.name)
 	}
 
+	s.closeChan = make(chan bool)
+
 	go s.monitorSync()
+	go s.monitorTimeoutThreshold(s.closeChan)
 
 	// open the snapshot
 	if snapshot {
@@ -205,6 +215,10 @@ func (s *PeerServer) listenAndServeTLS(certFile, keyFile string) error {
 
 // Stops the server.
 func (s *PeerServer) Close() {
+	if s.closeChan != nil {
+		close(s.closeChan)
+		s.closeChan = nil
+	}
 	if s.listener != nil {
 		s.listener.Close()
 		s.listener = nil
@@ -449,6 +463,18 @@ func (s *PeerServer) raftEventLogger(event raft.Event) {
 		log.Infof("%s: peer added: '%v'", s.name, value)
 	case raft.RemovePeerEventType:
 		log.Infof("%s: peer removed: '%v'", s.name, value)
+	case raft.HeartbeatTimeoutEventType:
+		var name = "<unknown>"
+		if peer, ok := value.(*raft.Peer); ok {
+			name = peer.Name
+		}
+		log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name)
+	case raft.ElectionTimeoutThresholdEventType:
+		select {
+		case s.timeoutThresholdChan <- value:
+		default:
+		}
+		
 	}
 }
 
@@ -474,3 +500,18 @@ func (s *PeerServer) monitorSync() {
 		}
 	}
 }
+
+// monitorTimeoutThreshold groups timeout threshold events together and prints
+// them as a single log line.
+func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
+	for {
+		select {
+		case value := <-s.timeoutThresholdChan:
+			log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value)
+		case <-closeChan:
+			return
+		}
+
+		time.Sleep(5 * time.Second)
+	}
+}