|
|
@@ -119,6 +119,7 @@ type server struct {
|
|
|
mutex sync.RWMutex
|
|
|
syncedPeer map[string]bool
|
|
|
|
|
|
+ stopped chan bool
|
|
|
c chan *ev
|
|
|
electionTimeout time.Duration
|
|
|
heartbeatTimeout time.Duration
|
|
|
@@ -166,6 +167,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
|
|
state: Stopped,
|
|
|
peers: make(map[string]*Peer),
|
|
|
log: newLog(),
|
|
|
+ stopped: make(chan bool),
|
|
|
c: make(chan *ev, 256),
|
|
|
electionTimeout: DefaultElectionTimeout,
|
|
|
heartbeatTimeout: DefaultHeartbeatTimeout,
|
|
|
@@ -279,6 +281,7 @@ func (s *server) setState(state string) {
|
|
|
s.state = state
|
|
|
if state == Leader {
|
|
|
s.leader = s.Name()
|
|
|
+ s.syncedPeer = make(map[string]bool)
|
|
|
}
|
|
|
|
|
|
// Dispatch state and leader change events.
|
|
|
@@ -463,8 +466,9 @@ func (s *server) Start() error {
|
|
|
// Shuts down the server.
|
|
|
func (s *server) Stop() {
|
|
|
s.send(&stopValue)
|
|
|
- s.mutex.Lock()
|
|
|
- defer s.mutex.Unlock()
|
|
|
+
|
|
|
+ // make sure the server has stopped before we close the log
|
|
|
+ <-s.stopped
|
|
|
s.log.close()
|
|
|
}
|
|
|
|
|
|
@@ -553,6 +557,7 @@ func (s *server) loop() {
|
|
|
s.snapshotLoop()
|
|
|
|
|
|
case Stopped:
|
|
|
+ s.stopped <- true
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
@@ -561,15 +566,26 @@ func (s *server) loop() {
|
|
|
// Sends an event to the event loop to be processed. The function will wait
|
|
|
// until the event is actually processed before returning.
|
|
|
func (s *server) send(value interface{}) (interface{}, error) {
|
|
|
- event := s.sendAsync(value)
|
|
|
+ event := &ev{target: value, c: make(chan error, 1)}
|
|
|
+ s.c <- event
|
|
|
err := <-event.c
|
|
|
return event.returnValue, err
|
|
|
}
|
|
|
|
|
|
-func (s *server) sendAsync(value interface{}) *ev {
|
|
|
+func (s *server) sendAsync(value interface{}) {
|
|
|
event := &ev{target: value, c: make(chan error, 1)}
|
|
|
- s.c <- event
|
|
|
- return event
|
|
|
+ // try a non-blocking send first
|
|
|
+ // in most cases, this should not be blocking
|
|
|
+ // avoid create unnecessary go routines
|
|
|
+ select {
|
|
|
+ case s.c <- event:
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ s.c <- event
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
// The event loop that is run when the server is in a Follower state.
|
|
|
@@ -578,7 +594,6 @@ func (s *server) sendAsync(value interface{}) *ev {
|
|
|
// 1.Receiving valid AppendEntries RPC, or
|
|
|
// 2.Granting vote to candidate
|
|
|
func (s *server) followerLoop() {
|
|
|
-
|
|
|
s.setState(Follower)
|
|
|
since := time.Now()
|
|
|
electionTimeout := s.ElectionTimeout()
|
|
|
@@ -739,7 +754,6 @@ func (s *server) candidateLoop() {
|
|
|
// The event loop that is run when the server is in a Leader state.
|
|
|
func (s *server) leaderLoop() {
|
|
|
s.setState(Leader)
|
|
|
- s.syncedPeer = make(map[string]bool)
|
|
|
logIndex, _ := s.log.lastInfo()
|
|
|
|
|
|
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
|
|
@@ -786,6 +800,7 @@ func (s *server) leaderLoop() {
|
|
|
for _, peer := range s.peers {
|
|
|
peer.stopHeartbeat(false)
|
|
|
}
|
|
|
+
|
|
|
s.syncedPeer = nil
|
|
|
}
|
|
|
|
|
|
@@ -851,19 +866,12 @@ func (s *server) processCommand(command Command, e *ev) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- // Issue an append entries response for the server.
|
|
|
- resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
|
|
|
- resp.append = true
|
|
|
- resp.peer = s.Name()
|
|
|
-
|
|
|
- // this must be async
|
|
|
- // sendAsync is not really async every time
|
|
|
- // when the sending speed of the user is larger than
|
|
|
- // the processing speed of the server, the buffered channel
|
|
|
- // will be full. Then sendAsync will become sync, which will
|
|
|
- // cause deadlock here.
|
|
|
- // so we use a goroutine to avoid the deadlock
|
|
|
- go s.sendAsync(resp)
|
|
|
+ s.syncedPeer[s.Name()] = true
|
|
|
+ if len(s.peers) == 0 {
|
|
|
+ commitIndex := s.log.currentIndex()
|
|
|
+ s.log.setCommitIndex(commitIndex)
|
|
|
+ s.debugln("commit index ", commitIndex)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//--------------------------------------
|
|
|
@@ -879,7 +887,6 @@ func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
|
|
|
|
|
|
// Processes the "append entries" request.
|
|
|
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
|
|
|
-
|
|
|
s.traceln("server.ae.process")
|
|
|
|
|
|
if req.Term < s.currentTerm {
|
|
|
@@ -908,7 +915,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
|
|
|
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
|
|
|
}
|
|
|
|
|
|
- // once the server appended and commited all the log entries from the leader
|
|
|
+ // once the server appended and committed all the log entries from the leader
|
|
|
|
|
|
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
|
|
|
}
|
|
|
@@ -953,6 +960,8 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
|
|
committedIndex := s.log.commitIndex
|
|
|
|
|
|
if commitIndex > committedIndex {
|
|
|
+ // leader needs to do a fsync before committing log entries
|
|
|
+ s.log.sync()
|
|
|
s.log.setCommitIndex(commitIndex)
|
|
|
s.debugln("commit index ", commitIndex)
|
|
|
}
|
|
|
@@ -976,7 +985,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
|
|
|
|
|
// If the request is coming from an old term then reject it.
|
|
|
if req.Term < s.Term() {
|
|
|
- s.debugln("server.rv.error: stale term")
|
|
|
+ s.debugln("server.rv.deny.vote: cause stale term")
|
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
|
}
|
|
|
|
|
|
@@ -984,7 +993,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
|
|
|
|
|
// If we've already voted for a different candidate then don't vote for this candidate.
|
|
|
if s.votedFor != "" && s.votedFor != req.CandidateName {
|
|
|
- s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
|
|
|
+ s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
|
|
|
" already vote for ", s.votedFor)
|
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
|
}
|
|
|
@@ -992,7 +1001,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
|
|
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
|
|
|
lastIndex, lastTerm := s.log.lastInfo()
|
|
|
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
|
|
|
- s.debugln("server.rv.error: out of date log: ", req.CandidateName,
|
|
|
+ s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
|
|
|
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
|
|
|
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
|
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
|
@@ -1322,7 +1331,7 @@ func (s *server) writeConf() {
|
|
|
confPath := path.Join(s.path, "conf")
|
|
|
tmpConfPath := path.Join(s.path, "conf.tmp")
|
|
|
|
|
|
- err := ioutil.WriteFile(tmpConfPath, b, 0600)
|
|
|
+ err := writeFileSynced(tmpConfPath, b, 0600)
|
|
|
|
|
|
if err != nil {
|
|
|
panic(err)
|
|
|
@@ -1359,9 +1368,13 @@ func (s *server) readConf() error {
|
|
|
//--------------------------------------
|
|
|
|
|
|
func (s *server) debugln(v ...interface{}) {
|
|
|
- debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
|
|
|
+ if logLevel > Debug {
|
|
|
+ debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (s *server) traceln(v ...interface{}) {
|
|
|
- tracef("[%s] %s", s.name, fmt.Sprintln(v...))
|
|
|
+ if logLevel > Trace {
|
|
|
+ tracef("[%s] %s", s.name, fmt.Sprintln(v...))
|
|
|
+ }
|
|
|
}
|