Browse Source

bump(goraft/raft): c76c5d95

Yicheng Qin 11 years ago
parent
commit
cbb706cd47

+ 3 - 1
third_party/github.com/goraft/raft/command.go

@@ -56,7 +56,9 @@ func newCommand(name string, data []byte) (Command, error) {
 				return nil, err
 			}
 		} else {
-			json.NewDecoder(bytes.NewReader(data)).Decode(copy)
+			if err := json.NewDecoder(bytes.NewReader(data)).Decode(copy); err != nil {
+				return nil, err
+			}
 		}
 	}
 

+ 3 - 1
third_party/github.com/goraft/raft/log_entry.go

@@ -29,7 +29,9 @@ func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command
 				return nil, err
 			}
 		} else {
-			json.NewEncoder(&buf).Encode(command)
+			if err := json.NewEncoder(&buf).Encode(command); err != nil {
+				return nil, err
+			}
 		}
 	}
 

+ 4 - 0
third_party/github.com/goraft/raft/peer.go

@@ -89,6 +89,8 @@ func (p *Peer) startHeartbeat() {
 	p.stopChan = make(chan bool)
 	c := make(chan bool)
 
+	p.setLastActivity(time.Now())
+
 	p.server.routineGroup.Add(1)
 	go func() {
 		defer p.server.routineGroup.Done()
@@ -99,6 +101,8 @@ func (p *Peer) startHeartbeat() {
 
 // Stops the peer heartbeat.
 func (p *Peer) stopHeartbeat(flush bool) {
+	p.setLastActivity(time.Time{})
+
 	p.stopChan <- flush
 }
 

+ 13 - 1
third_party/github.com/goraft/raft/server.go

@@ -334,6 +334,8 @@ func (s *server) IsLogEmpty() bool {
 
 // A list of all the log entries. This should only be used for debugging purposes.
 func (s *server) LogEntries() []*LogEntry {
+	s.log.mutex.RLock()
+	defer s.log.mutex.RUnlock()
 	return s.log.entries
 }
 
@@ -471,7 +473,9 @@ func (s *server) Start() error {
 	return nil
 }
 
-// Init initializes the raft server
+// Init initializes the raft server.
+// If there is no previous log file under the given path, Init() will create an empty log file.
+// Otherwise, Init() will load in the log entries from the log file.
 func (s *server) Init() error {
 	if s.Running() {
 		return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
@@ -613,6 +617,10 @@ func (s *server) loop() {
 // Sends an event to the event loop to be processed. The function will wait
 // until the event is actually processed before returning.
 func (s *server) send(value interface{}) (interface{}, error) {
+	if !s.Running() {
+		return nil, StopError
+	}
+
 	event := &ev{target: value, c: make(chan error, 1)}
 	select {
 	case s.c <- event:
@@ -628,6 +636,10 @@ func (s *server) send(value interface{}) (interface{}, error) {
 }
 
 func (s *server) sendAsync(value interface{}) {
+	if !s.Running() {
+		return
+	}
+
 	event := &ev{target: value, c: make(chan error, 1)}
 	// try a non-blocking send first
 	// in most cases, this should not be blocking