Browse Source

Merge pull request #698 from unihorn/61

bump(goraft/raft): 585c58026c
Yicheng Qin 11 years ago
parent
commit
2c65e9852c

+ 16 - 0
third_party/github.com/goraft/raft/http_transporter.go

@@ -244,6 +244,10 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
 		}
 
 		resp := server.AppendEntries(req)
+		if resp == nil {
+			http.Error(w, "Failed creating response.", http.StatusInternalServerError)
+			return
+		}
 		if _, err := resp.Encode(w); err != nil {
 			http.Error(w, "", http.StatusInternalServerError)
 			return
@@ -263,6 +267,10 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
 		}
 
 		resp := server.RequestVote(req)
+		if resp == nil {
+			http.Error(w, "Failed creating response.", http.StatusInternalServerError)
+			return
+		}
 		if _, err := resp.Encode(w); err != nil {
 			http.Error(w, "", http.StatusInternalServerError)
 			return
@@ -282,6 +290,10 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
 		}
 
 		resp := server.RequestSnapshot(req)
+		if resp == nil {
+			http.Error(w, "Failed creating response.", http.StatusInternalServerError)
+			return
+		}
 		if _, err := resp.Encode(w); err != nil {
 			http.Error(w, "", http.StatusInternalServerError)
 			return
@@ -301,6 +313,10 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun
 		}
 
 		resp := server.SnapshotRecoveryRequest(req)
+		if resp == nil {
+			http.Error(w, "Failed creating response.", http.StatusInternalServerError)
+			return
+		}
 		if _, err := resp.Encode(w); err != nil {
 			http.Error(w, "", http.StatusInternalServerError)
 			return

+ 6 - 1
third_party/github.com/goraft/raft/peer.go

@@ -88,7 +88,12 @@ func (p *Peer) setLastActivity(now time.Time) {
 func (p *Peer) startHeartbeat() {
 	p.stopChan = make(chan bool)
 	c := make(chan bool)
-	go p.heartbeat(c)
+
+	p.server.routineGroup.Add(1)
+	go func() {
+		defer p.server.routineGroup.Done()
+		p.heartbeat(c)
+	}()
 	<-c
 }
 

+ 52 - 22
third_party/github.com/goraft/raft/server.go

@@ -55,6 +55,7 @@ const ElectionTimeoutThresholdPercent = 0.8
 var NotLeaderError = errors.New("raft.Server: Not current leader")
 var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
 var CommandTimeoutError = errors.New("raft: Command timeout")
+var StopError = errors.New("raft: Has been stopped")
 
 //------------------------------------------------------------------------------
 //
@@ -123,7 +124,7 @@ type server struct {
 	mutex      sync.RWMutex
 	syncedPeer map[string]bool
 
-	stopped           chan chan bool
+	stopped           chan bool
 	c                 chan *ev
 	electionTimeout   time.Duration
 	heartbeatInterval time.Duration
@@ -140,6 +141,8 @@ type server struct {
 	maxLogEntriesPerRequest uint64
 
 	connectionString string
+
+	routineGroup sync.WaitGroup
 }
 
 // An internal event to be processed by the server's event loop.
@@ -177,7 +180,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
 		state:                   Stopped,
 		peers:                   make(map[string]*Peer),
 		log:                     newLog(),
-		stopped:                 make(chan chan bool),
 		c:                       make(chan *ev, 256),
 		electionTimeout:         DefaultElectionTimeout,
 		heartbeatInterval:       DefaultHeartbeatInterval,
@@ -440,6 +442,9 @@ func (s *server) Start() error {
 		return err
 	}
 
+	// stopped needs to be allocated each time server starts
+	// because it is closed at `Stop`.
+	s.stopped = make(chan bool)
 	s.setState(Follower)
 
 	// If no log entries exist then
@@ -457,7 +462,11 @@ func (s *server) Start() error {
 
 	debugln(s.GetState())
 
-	go s.loop()
+	s.routineGroup.Add(1)
+	go func() {
+		defer s.routineGroup.Done()
+		s.loop()
+	}()
 
 	return nil
 }
@@ -507,11 +516,11 @@ func (s *server) Stop() {
 		return
 	}
 
-	stop := make(chan bool)
-	s.stopped <- stop
+	close(s.stopped)
+
+	// make sure all goroutines have stopped before we close the log
+	s.routineGroup.Wait()
 
-	// make sure the server has stopped before we close the log
-	<-stop
 	s.log.close()
 	s.setState(Stopped)
 }
@@ -605,9 +614,17 @@ func (s *server) loop() {
 // until the event is actually processed before returning.
 func (s *server) send(value interface{}) (interface{}, error) {
 	event := &ev{target: value, c: make(chan error, 1)}
-	s.c <- event
-	err := <-event.c
-	return event.returnValue, err
+	select {
+	case s.c <- event:
+	case <-s.stopped:
+		return nil, StopError
+	}
+	select {
+	case <-s.stopped:
+		return nil, StopError
+	case err := <-event.c:
+		return event.returnValue, err
+	}
 }
 
 func (s *server) sendAsync(value interface{}) {
@@ -621,8 +638,13 @@ func (s *server) sendAsync(value interface{}) {
 	default:
 	}
 
+	s.routineGroup.Add(1)
 	go func() {
-		s.c <- event
+		defer s.routineGroup.Done()
+		select {
+		case s.c <- event:
+		case <-s.stopped:
+		}
 	}()
 }
 
@@ -640,9 +662,8 @@ func (s *server) followerLoop() {
 		var err error
 		update := false
 		select {
-		case stop := <-s.stopped:
+		case <-s.stopped:
 			s.setState(Stopped)
-			stop <- true
 			return
 
 		case e := <-s.c:
@@ -717,7 +738,11 @@ func (s *server) candidateLoop() {
 			// Send RequestVote RPCs to all other servers.
 			respChan = make(chan *RequestVoteResponse, len(s.peers))
 			for _, peer := range s.peers {
-				go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
+				s.routineGroup.Add(1)
+				go func(peer *Peer) {
+					defer s.routineGroup.Done()
+					peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
+				}(peer)
 			}
 
 			// Wait for either:
@@ -740,9 +765,8 @@ func (s *server) candidateLoop() {
 
 		// Collect votes from peers.
 		select {
-		case stop := <-s.stopped:
+		case <-s.stopped:
 			s.setState(Stopped)
-			stop <- true
 			return
 
 		case resp := <-respChan:
@@ -786,19 +810,22 @@ func (s *server) leaderLoop() {
 	// "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
 	// each server; repeat during idle periods to prevent election timeouts
 	// (§5.2)". The heartbeats started above do the "idle" period work.
-	go s.Do(NOPCommand{})
+	s.routineGroup.Add(1)
+	go func() {
+		defer s.routineGroup.Done()
+		s.Do(NOPCommand{})
+	}()
 
 	// Begin to collect response from followers
 	for s.State() == Leader {
 		var err error
 		select {
-		case stop := <-s.stopped:
+		case <-s.stopped:
 			// Stop all peers before stop
 			for _, peer := range s.peers {
 				peer.stopHeartbeat(false)
 			}
 			s.setState(Stopped)
-			stop <- true
 			return
 
 		case e := <-s.c:
@@ -826,9 +853,8 @@ func (s *server) snapshotLoop() {
 	for s.State() == Snapshotting {
 		var err error
 		select {
-		case stop := <-s.stopped:
+		case <-s.stopped:
 			s.setState(Stopped)
-			stop <- true
 			return
 
 		case e := <-s.c:
@@ -1109,7 +1135,11 @@ func (s *server) RemovePeer(name string) error {
 			// So we might be holding log lock and waiting for log lock,
 			// which lead to a deadlock.
 			// TODO(xiangli) refactor log lock
-			go peer.stopHeartbeat(true)
+			s.routineGroup.Add(1)
+			go func() {
+				defer s.routineGroup.Done()
+				peer.stopHeartbeat(true)
+			}()
 		}
 
 		delete(s.peers, name)