|
@@ -21,6 +21,7 @@ import (
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
|
Stopped = "stopped"
|
|
Stopped = "stopped"
|
|
|
|
|
+ Initialized = "initialized"
|
|
|
Follower = "follower"
|
|
Follower = "follower"
|
|
|
Candidate = "candidate"
|
|
Candidate = "candidate"
|
|
|
Leader = "leader"
|
|
Leader = "leader"
|
|
@@ -45,8 +46,6 @@ const (
|
|
|
// election timeout.
|
|
// election timeout.
|
|
|
const ElectionTimeoutThresholdPercent = 0.8
|
|
const ElectionTimeoutThresholdPercent = 0.8
|
|
|
|
|
|
|
|
-var stopValue interface{}
|
|
|
|
|
-
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
//------------------------------------------------------------------------------
|
|
|
//
|
|
//
|
|
|
// Errors
|
|
// Errors
|
|
@@ -96,6 +95,7 @@ type Server interface {
|
|
|
AddPeer(name string, connectiongString string) error
|
|
AddPeer(name string, connectiongString string) error
|
|
|
RemovePeer(name string) error
|
|
RemovePeer(name string) error
|
|
|
Peers() map[string]*Peer
|
|
Peers() map[string]*Peer
|
|
|
|
|
+ Init() error
|
|
|
Start() error
|
|
Start() error
|
|
|
Stop()
|
|
Stop()
|
|
|
Running() bool
|
|
Running() bool
|
|
@@ -103,6 +103,7 @@ type Server interface {
|
|
|
TakeSnapshot() error
|
|
TakeSnapshot() error
|
|
|
LoadSnapshot() error
|
|
LoadSnapshot() error
|
|
|
AddEventListener(string, EventListener)
|
|
AddEventListener(string, EventListener)
|
|
|
|
|
+ FlushCommitIndex()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type server struct {
|
|
type server struct {
|
|
@@ -122,13 +123,19 @@ type server struct {
|
|
|
mutex sync.RWMutex
|
|
mutex sync.RWMutex
|
|
|
syncedPeer map[string]bool
|
|
syncedPeer map[string]bool
|
|
|
|
|
|
|
|
- stopped chan bool
|
|
|
|
|
|
|
+ stopped chan chan bool
|
|
|
c chan *ev
|
|
c chan *ev
|
|
|
electionTimeout time.Duration
|
|
electionTimeout time.Duration
|
|
|
heartbeatInterval time.Duration
|
|
heartbeatInterval time.Duration
|
|
|
|
|
|
|
|
- currentSnapshot *Snapshot
|
|
|
|
|
- lastSnapshot *Snapshot
|
|
|
|
|
|
|
+ snapshot *Snapshot
|
|
|
|
|
+
|
|
|
|
|
+ // PendingSnapshot is an unfinished snapshot.
|
|
|
|
|
+ // After the pendingSnapshot is saved to disk,
|
|
|
|
|
+ // it will be set to snapshot and also will be
|
|
|
|
|
+ // set to nil.
|
|
|
|
|
+ pendingSnapshot *Snapshot
|
|
|
|
|
+
|
|
|
stateMachine StateMachine
|
|
stateMachine StateMachine
|
|
|
maxLogEntriesPerRequest uint64
|
|
maxLogEntriesPerRequest uint64
|
|
|
|
|
|
|
@@ -170,7 +177,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
|
|
state: Stopped,
|
|
state: Stopped,
|
|
|
peers: make(map[string]*Peer),
|
|
peers: make(map[string]*Peer),
|
|
|
log: newLog(),
|
|
log: newLog(),
|
|
|
- stopped: make(chan bool),
|
|
|
|
|
|
|
+ stopped: make(chan chan bool),
|
|
|
c: make(chan *ev, 256),
|
|
c: make(chan *ev, 256),
|
|
|
electionTimeout: DefaultElectionTimeout,
|
|
electionTimeout: DefaultElectionTimeout,
|
|
|
heartbeatInterval: DefaultHeartbeatInterval,
|
|
heartbeatInterval: DefaultHeartbeatInterval,
|
|
@@ -292,9 +299,8 @@ func (s *server) setState(state string) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Dispatch state and leader change events.
|
|
// Dispatch state and leader change events.
|
|
|
- if prevState != state {
|
|
|
|
|
- s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
|
|
|
|
|
+
|
|
|
if prevLeader != s.leader {
|
|
if prevLeader != s.leader {
|
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
|
}
|
|
}
|
|
@@ -419,35 +425,21 @@ func init() {
|
|
|
RegisterCommand(&DefaultLeaveCommand{})
|
|
RegisterCommand(&DefaultLeaveCommand{})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Start as follow
|
|
|
|
|
|
|
+// Start the raft server
|
|
|
// If log entries exist then allow promotion to candidate if no AEs received.
|
|
// If log entries exist then allow promotion to candidate if no AEs received.
|
|
|
// If no log entries exist then wait for AEs from another node.
|
|
// If no log entries exist then wait for AEs from another node.
|
|
|
// If no log entries exist and a self-join command is issued then
|
|
// If no log entries exist and a self-join command is issued then
|
|
|
// immediately become leader and commit entry.
|
|
// immediately become leader and commit entry.
|
|
|
-
|
|
|
|
|
func (s *server) Start() error {
|
|
func (s *server) Start() error {
|
|
|
// Exit if the server is already running.
|
|
// Exit if the server is already running.
|
|
|
- if s.State() != Stopped {
|
|
|
|
|
- return errors.New("raft.Server: Server already running")
|
|
|
|
|
|
|
+ if s.Running() {
|
|
|
|
|
+ return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Create snapshot directory if not exist
|
|
|
|
|
- os.Mkdir(path.Join(s.path, "snapshot"), 0700)
|
|
|
|
|
-
|
|
|
|
|
- if err := s.readConf(); err != nil {
|
|
|
|
|
- s.debugln("raft: Conf file error: ", err)
|
|
|
|
|
- return fmt.Errorf("raft: Initialization error: %s", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Initialize the log and load it up.
|
|
|
|
|
- if err := s.log.open(s.LogPath()); err != nil {
|
|
|
|
|
- s.debugln("raft: Log error: ", err)
|
|
|
|
|
- return fmt.Errorf("raft: Initialization error: %s", err)
|
|
|
|
|
|
|
+ if err := s.Init(); err != nil {
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Update the term to the last term in the log.
|
|
|
|
|
- _, s.currentTerm = s.log.lastInfo()
|
|
|
|
|
-
|
|
|
|
|
s.setState(Follower)
|
|
s.setState(Follower)
|
|
|
|
|
|
|
|
// If no log entries exist then
|
|
// If no log entries exist then
|
|
@@ -470,12 +462,51 @@ func (s *server) Start() error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// Init initializes the raft server
|
|
|
|
|
+func (s *server) Init() error {
|
|
|
|
|
+ if s.Running() {
|
|
|
|
|
+ return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // server has been initialized or server was stopped after initialized
|
|
|
|
|
+ if s.state == Initialized || !s.log.isEmpty() {
|
|
|
|
|
+ s.state = Initialized
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Create snapshot directory if it does not exist
|
|
|
|
|
+ err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
|
|
|
|
|
+ if err != nil && !os.IsExist(err) {
|
|
|
|
|
+ s.debugln("raft: Snapshot dir error: ", err)
|
|
|
|
|
+ return fmt.Errorf("raft: Initialization error: %s", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err := s.readConf(); err != nil {
|
|
|
|
|
+ s.debugln("raft: Conf file error: ", err)
|
|
|
|
|
+ return fmt.Errorf("raft: Initialization error: %s", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Initialize the log and load it up.
|
|
|
|
|
+ if err := s.log.open(s.LogPath()); err != nil {
|
|
|
|
|
+ s.debugln("raft: Log error: ", err)
|
|
|
|
|
+ return fmt.Errorf("raft: Initialization error: %s", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Update the term to the last term in the log.
|
|
|
|
|
+ _, s.currentTerm = s.log.lastInfo()
|
|
|
|
|
+
|
|
|
|
|
+ s.state = Initialized
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// Shuts down the server.
|
|
// Shuts down the server.
|
|
|
func (s *server) Stop() {
|
|
func (s *server) Stop() {
|
|
|
- s.send(&stopValue)
|
|
|
|
|
|
|
+ stop := make(chan bool)
|
|
|
|
|
+ s.stopped <- stop
|
|
|
|
|
+ s.state = Stopped
|
|
|
|
|
|
|
|
// make sure the server has stopped before we close the log
|
|
// make sure the server has stopped before we close the log
|
|
|
- <-s.stopped
|
|
|
|
|
|
|
+ <-stop
|
|
|
s.log.close()
|
|
s.log.close()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -483,55 +514,50 @@ func (s *server) Stop() {
|
|
|
func (s *server) Running() bool {
|
|
func (s *server) Running() bool {
|
|
|
s.mutex.RLock()
|
|
s.mutex.RLock()
|
|
|
defer s.mutex.RUnlock()
|
|
defer s.mutex.RUnlock()
|
|
|
- return s.state != Stopped
|
|
|
|
|
|
|
+ return (s.state != Stopped && s.state != Initialized)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
|
// Term
|
|
// Term
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
-// Sets the current term for the server. This is only used when an external
|
|
|
|
|
-// current term is found.
|
|
|
|
|
-func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
|
|
|
|
|
|
|
+// updates the current term for the server. This is only used when a larger
|
|
|
|
|
+// external term is found.
|
|
|
|
|
+func (s *server) updateCurrentTerm(term uint64, leaderName string) {
|
|
|
|
|
+ _assert(term > s.currentTerm,
|
|
|
|
|
+ "upadteCurrentTerm: update is called when term is not larger than currentTerm")
|
|
|
|
|
+
|
|
|
s.mutex.Lock()
|
|
s.mutex.Lock()
|
|
|
defer s.mutex.Unlock()
|
|
defer s.mutex.Unlock()
|
|
|
-
|
|
|
|
|
// Store previous values temporarily.
|
|
// Store previous values temporarily.
|
|
|
- prevState := s.state
|
|
|
|
|
prevTerm := s.currentTerm
|
|
prevTerm := s.currentTerm
|
|
|
prevLeader := s.leader
|
|
prevLeader := s.leader
|
|
|
|
|
|
|
|
- if term > s.currentTerm {
|
|
|
|
|
- // stop heartbeats before step-down
|
|
|
|
|
- if s.state == Leader {
|
|
|
|
|
- s.mutex.Unlock()
|
|
|
|
|
- for _, peer := range s.peers {
|
|
|
|
|
- peer.stopHeartbeat(false)
|
|
|
|
|
- }
|
|
|
|
|
- s.mutex.Lock()
|
|
|
|
|
|
|
+ // set currentTerm = T, convert to follower (§5.1)
|
|
|
|
|
+ // stop heartbeats before step-down
|
|
|
|
|
+ if s.state == Leader {
|
|
|
|
|
+ s.mutex.Unlock()
|
|
|
|
|
+ for _, peer := range s.peers {
|
|
|
|
|
+ peer.stopHeartbeat(false)
|
|
|
}
|
|
}
|
|
|
- // update the term and clear vote for
|
|
|
|
|
- s.state = Follower
|
|
|
|
|
- s.currentTerm = term
|
|
|
|
|
- s.leader = leaderName
|
|
|
|
|
- s.votedFor = ""
|
|
|
|
|
- } else if term == s.currentTerm && s.state != Leader && append {
|
|
|
|
|
- // discover new leader when candidate
|
|
|
|
|
- // save leader name when follower
|
|
|
|
|
- s.state = Follower
|
|
|
|
|
- s.leader = leaderName
|
|
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
}
|
|
}
|
|
|
|
|
+ // update the term and clear vote for
|
|
|
|
|
+ if s.state != Follower {
|
|
|
|
|
+ s.mutex.Unlock()
|
|
|
|
|
+ s.setState(Follower)
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ }
|
|
|
|
|
+ s.currentTerm = term
|
|
|
|
|
+ s.leader = leaderName
|
|
|
|
|
+ s.votedFor = ""
|
|
|
|
|
|
|
|
// Dispatch change events.
|
|
// Dispatch change events.
|
|
|
- if prevState != s.state {
|
|
|
|
|
- s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
|
|
|
|
|
+
|
|
|
if prevLeader != s.leader {
|
|
if prevLeader != s.leader {
|
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
|
}
|
|
}
|
|
|
- if prevTerm != s.currentTerm {
|
|
|
|
|
- s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
@@ -554,26 +580,19 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
|
|
|
func (s *server) loop() {
|
|
func (s *server) loop() {
|
|
|
defer s.debugln("server.loop.end")
|
|
defer s.debugln("server.loop.end")
|
|
|
|
|
|
|
|
- for {
|
|
|
|
|
|
|
+ for s.state != Stopped {
|
|
|
state := s.State()
|
|
state := s.State()
|
|
|
|
|
|
|
|
s.debugln("server.loop.run ", state)
|
|
s.debugln("server.loop.run ", state)
|
|
|
switch state {
|
|
switch state {
|
|
|
case Follower:
|
|
case Follower:
|
|
|
s.followerLoop()
|
|
s.followerLoop()
|
|
|
-
|
|
|
|
|
case Candidate:
|
|
case Candidate:
|
|
|
s.candidateLoop()
|
|
s.candidateLoop()
|
|
|
-
|
|
|
|
|
case Leader:
|
|
case Leader:
|
|
|
s.leaderLoop()
|
|
s.leaderLoop()
|
|
|
-
|
|
|
|
|
case Snapshotting:
|
|
case Snapshotting:
|
|
|
s.snapshotLoop()
|
|
s.snapshotLoop()
|
|
|
-
|
|
|
|
|
- case Stopped:
|
|
|
|
|
- s.stopped <- true
|
|
|
|
|
- return
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -609,7 +628,6 @@ func (s *server) sendAsync(value interface{}) {
|
|
|
// 1.Receiving valid AppendEntries RPC, or
|
|
// 1.Receiving valid AppendEntries RPC, or
|
|
|
// 2.Granting vote to candidate
|
|
// 2.Granting vote to candidate
|
|
|
func (s *server) followerLoop() {
|
|
func (s *server) followerLoop() {
|
|
|
- s.setState(Follower)
|
|
|
|
|
since := time.Now()
|
|
since := time.Now()
|
|
|
electionTimeout := s.ElectionTimeout()
|
|
electionTimeout := s.ElectionTimeout()
|
|
|
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
|
|
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
|
|
@@ -618,42 +636,41 @@ func (s *server) followerLoop() {
|
|
|
var err error
|
|
var err error
|
|
|
update := false
|
|
update := false
|
|
|
select {
|
|
select {
|
|
|
|
|
+ case stop := <-s.stopped:
|
|
|
|
|
+ s.setState(Stopped)
|
|
|
|
|
+ stop <- true
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
case e := <-s.c:
|
|
case e := <-s.c:
|
|
|
- if e.target == &stopValue {
|
|
|
|
|
- s.setState(Stopped)
|
|
|
|
|
- } else {
|
|
|
|
|
- switch req := e.target.(type) {
|
|
|
|
|
- case JoinCommand:
|
|
|
|
|
- //If no log entries exist and a self-join command is issued
|
|
|
|
|
- //then immediately become leader and commit entry.
|
|
|
|
|
- if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
|
|
|
|
|
- s.debugln("selfjoin and promote to leader")
|
|
|
|
|
- s.setState(Leader)
|
|
|
|
|
- s.processCommand(req, e)
|
|
|
|
|
- } else {
|
|
|
|
|
- err = NotLeaderError
|
|
|
|
|
- }
|
|
|
|
|
- case *AppendEntriesRequest:
|
|
|
|
|
- // If heartbeats get too close to the election timeout then send an event.
|
|
|
|
|
- elapsedTime := time.Now().Sub(since)
|
|
|
|
|
- if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
|
|
|
|
|
- s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
|
|
|
|
|
- }
|
|
|
|
|
- e.returnValue, update = s.processAppendEntriesRequest(req)
|
|
|
|
|
- case *RequestVoteRequest:
|
|
|
|
|
- e.returnValue, update = s.processRequestVoteRequest(req)
|
|
|
|
|
- case *SnapshotRequest:
|
|
|
|
|
- e.returnValue = s.processSnapshotRequest(req)
|
|
|
|
|
- default:
|
|
|
|
|
|
|
+ switch req := e.target.(type) {
|
|
|
|
|
+ case JoinCommand:
|
|
|
|
|
+ //If no log entries exist and a self-join command is issued
|
|
|
|
|
+ //then immediately become leader and commit entry.
|
|
|
|
|
+ if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
|
|
|
|
|
+ s.debugln("selfjoin and promote to leader")
|
|
|
|
|
+ s.setState(Leader)
|
|
|
|
|
+ s.processCommand(req, e)
|
|
|
|
|
+ } else {
|
|
|
err = NotLeaderError
|
|
err = NotLeaderError
|
|
|
}
|
|
}
|
|
|
|
|
+ case *AppendEntriesRequest:
|
|
|
|
|
+ // If heartbeats get too close to the election timeout then send an event.
|
|
|
|
|
+ elapsedTime := time.Now().Sub(since)
|
|
|
|
|
+ if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
|
|
|
|
|
+ s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
|
|
|
|
|
+ }
|
|
|
|
|
+ e.returnValue, update = s.processAppendEntriesRequest(req)
|
|
|
|
|
+ case *RequestVoteRequest:
|
|
|
|
|
+ e.returnValue, update = s.processRequestVoteRequest(req)
|
|
|
|
|
+ case *SnapshotRequest:
|
|
|
|
|
+ e.returnValue = s.processSnapshotRequest(req)
|
|
|
|
|
+ default:
|
|
|
|
|
+ err = NotLeaderError
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
// Callback to event.
|
|
// Callback to event.
|
|
|
e.c <- err
|
|
e.c <- err
|
|
|
|
|
|
|
|
case <-timeoutChan:
|
|
case <-timeoutChan:
|
|
|
-
|
|
|
|
|
// only allow synced follower to promote to candidate
|
|
// only allow synced follower to promote to candidate
|
|
|
if s.promotable() {
|
|
if s.promotable() {
|
|
|
s.setState(Candidate)
|
|
s.setState(Candidate)
|
|
@@ -674,8 +691,6 @@ func (s *server) followerLoop() {
|
|
|
|
|
|
|
|
// The event loop that is run when the server is in a Candidate state.
|
|
// The event loop that is run when the server is in a Candidate state.
|
|
|
func (s *server) candidateLoop() {
|
|
func (s *server) candidateLoop() {
|
|
|
- lastLogIndex, lastLogTerm := s.log.lastInfo()
|
|
|
|
|
-
|
|
|
|
|
// Clear leader value.
|
|
// Clear leader value.
|
|
|
prevLeader := s.leader
|
|
prevLeader := s.leader
|
|
|
s.leader = ""
|
|
s.leader = ""
|
|
@@ -683,81 +698,77 @@ func (s *server) candidateLoop() {
|
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for s.State() == Candidate {
|
|
|
|
|
- // Increment current term, vote for self.
|
|
|
|
|
- s.currentTerm++
|
|
|
|
|
- s.votedFor = s.name
|
|
|
|
|
|
|
+ lastLogIndex, lastLogTerm := s.log.lastInfo()
|
|
|
|
|
+ doVote := true
|
|
|
|
|
+ votesGranted := 0
|
|
|
|
|
+ var timeoutChan <-chan time.Time
|
|
|
|
|
+ var respChan chan *RequestVoteResponse
|
|
|
|
|
|
|
|
- // Send RequestVote RPCs to all other servers.
|
|
|
|
|
- respChan := make(chan *RequestVoteResponse, len(s.peers))
|
|
|
|
|
- for _, peer := range s.peers {
|
|
|
|
|
- go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ for s.State() == Candidate {
|
|
|
|
|
+ if doVote {
|
|
|
|
|
+ // Increment current term, vote for self.
|
|
|
|
|
+ s.currentTerm++
|
|
|
|
|
+ s.votedFor = s.name
|
|
|
|
|
|
|
|
- // Wait for either:
|
|
|
|
|
- // * Votes received from majority of servers: become leader
|
|
|
|
|
- // * AppendEntries RPC received from new leader: step down.
|
|
|
|
|
- // * Election timeout elapses without election resolution: increment term, start new election
|
|
|
|
|
- // * Discover higher term: step down (§5.1)
|
|
|
|
|
- votesGranted := 1
|
|
|
|
|
- timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
|
|
|
|
|
- timeout := false
|
|
|
|
|
-
|
|
|
|
|
- for {
|
|
|
|
|
- // If we received enough votes then stop waiting for more votes.
|
|
|
|
|
- s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
|
|
|
|
|
- if votesGranted >= s.QuorumSize() {
|
|
|
|
|
- s.setState(Leader)
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ // Send RequestVote RPCs to all other servers.
|
|
|
|
|
+ respChan = make(chan *RequestVoteResponse, len(s.peers))
|
|
|
|
|
+ for _, peer := range s.peers {
|
|
|
|
|
+ go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Collect votes from peers.
|
|
|
|
|
- select {
|
|
|
|
|
- case resp := <-respChan:
|
|
|
|
|
- if resp.VoteGranted {
|
|
|
|
|
- s.debugln("server.candidate.vote.granted: ", votesGranted)
|
|
|
|
|
- votesGranted++
|
|
|
|
|
- } else if resp.Term > s.currentTerm {
|
|
|
|
|
- s.debugln("server.candidate.vote.failed")
|
|
|
|
|
- s.setCurrentTerm(resp.Term, "", false)
|
|
|
|
|
- } else {
|
|
|
|
|
- s.debugln("server.candidate.vote: denied")
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Wait for either:
|
|
|
|
|
+ // * Votes received from majority of servers: become leader
|
|
|
|
|
+ // * AppendEntries RPC received from new leader: step down.
|
|
|
|
|
+ // * Election timeout elapses without election resolution: increment term, start new election
|
|
|
|
|
+ // * Discover higher term: step down (§5.1)
|
|
|
|
|
+ votesGranted = 1
|
|
|
|
|
+ timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
|
|
|
|
|
+ doVote = false
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- case e := <-s.c:
|
|
|
|
|
- var err error
|
|
|
|
|
- if e.target == &stopValue {
|
|
|
|
|
- s.setState(Stopped)
|
|
|
|
|
- } else {
|
|
|
|
|
- switch req := e.target.(type) {
|
|
|
|
|
- case Command:
|
|
|
|
|
- err = NotLeaderError
|
|
|
|
|
- case *AppendEntriesRequest:
|
|
|
|
|
- e.returnValue, _ = s.processAppendEntriesRequest(req)
|
|
|
|
|
- case *RequestVoteRequest:
|
|
|
|
|
- e.returnValue, _ = s.processRequestVoteRequest(req)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- // Callback to event.
|
|
|
|
|
- e.c <- err
|
|
|
|
|
|
|
+ // If we received enough votes then stop waiting for more votes.
|
|
|
|
|
+ // And return from the candidate loop
|
|
|
|
|
+ if votesGranted == s.QuorumSize() {
|
|
|
|
|
+ s.debugln("server.candidate.recv.enough.votes")
|
|
|
|
|
+ s.setState(Leader)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- case <-timeoutChan:
|
|
|
|
|
- timeout = true
|
|
|
|
|
|
|
+ // Collect votes from peers.
|
|
|
|
|
+ select {
|
|
|
|
|
+ case stop := <-s.stopped:
|
|
|
|
|
+ s.setState(Stopped)
|
|
|
|
|
+ stop <- true
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ case resp := <-respChan:
|
|
|
|
|
+ if success := s.processVoteResponse(resp); success {
|
|
|
|
|
+ s.debugln("server.candidate.vote.granted: ", votesGranted)
|
|
|
|
|
+ votesGranted++
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // both process AER and RVR can make the server to follower
|
|
|
|
|
- // also break when timeout happens
|
|
|
|
|
- if s.State() != Candidate || timeout {
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ case e := <-s.c:
|
|
|
|
|
+ var err error
|
|
|
|
|
+ switch req := e.target.(type) {
|
|
|
|
|
+ case Command:
|
|
|
|
|
+ err = NotLeaderError
|
|
|
|
|
+ case *AppendEntriesRequest:
|
|
|
|
|
+ e.returnValue, _ = s.processAppendEntriesRequest(req)
|
|
|
|
|
+ case *RequestVoteRequest:
|
|
|
|
|
+ e.returnValue, _ = s.processRequestVoteRequest(req)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // Callback to event.
|
|
|
|
|
+ e.c <- err
|
|
|
|
|
+
|
|
|
|
|
+ case <-timeoutChan:
|
|
|
|
|
+ doVote = true
|
|
|
}
|
|
}
|
|
|
- // continue when timeout happened
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// The event loop that is run when the server is in a Leader state.
|
|
// The event loop that is run when the server is in a Leader state.
|
|
|
func (s *server) leaderLoop() {
|
|
func (s *server) leaderLoop() {
|
|
|
- s.setState(Leader)
|
|
|
|
|
logIndex, _ := s.log.lastInfo()
|
|
logIndex, _ := s.log.lastInfo()
|
|
|
|
|
|
|
|
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
|
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
|
@@ -777,25 +788,26 @@ func (s *server) leaderLoop() {
|
|
|
for s.State() == Leader {
|
|
for s.State() == Leader {
|
|
|
var err error
|
|
var err error
|
|
|
select {
|
|
select {
|
|
|
|
|
+ case stop := <-s.stopped:
|
|
|
|
|
+ // Stop all peers before stop
|
|
|
|
|
+ for _, peer := range s.peers {
|
|
|
|
|
+ peer.stopHeartbeat(false)
|
|
|
|
|
+ }
|
|
|
|
|
+ s.setState(Stopped)
|
|
|
|
|
+ stop <- true
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
case e := <-s.c:
|
|
case e := <-s.c:
|
|
|
- if e.target == &stopValue {
|
|
|
|
|
- // Stop all peers before stop
|
|
|
|
|
- for _, peer := range s.peers {
|
|
|
|
|
- peer.stopHeartbeat(false)
|
|
|
|
|
- }
|
|
|
|
|
- s.setState(Stopped)
|
|
|
|
|
- } else {
|
|
|
|
|
- switch req := e.target.(type) {
|
|
|
|
|
- case Command:
|
|
|
|
|
- s.processCommand(req, e)
|
|
|
|
|
- continue
|
|
|
|
|
- case *AppendEntriesRequest:
|
|
|
|
|
- e.returnValue, _ = s.processAppendEntriesRequest(req)
|
|
|
|
|
- case *AppendEntriesResponse:
|
|
|
|
|
- s.processAppendEntriesResponse(req)
|
|
|
|
|
- case *RequestVoteRequest:
|
|
|
|
|
- e.returnValue, _ = s.processRequestVoteRequest(req)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ switch req := e.target.(type) {
|
|
|
|
|
+ case Command:
|
|
|
|
|
+ s.processCommand(req, e)
|
|
|
|
|
+ continue
|
|
|
|
|
+ case *AppendEntriesRequest:
|
|
|
|
|
+ e.returnValue, _ = s.processAppendEntriesRequest(req)
|
|
|
|
|
+ case *AppendEntriesResponse:
|
|
|
|
|
+ s.processAppendEntriesResponse(req)
|
|
|
|
|
+ case *RequestVoteRequest:
|
|
|
|
|
+ e.returnValue, _ = s.processRequestVoteRequest(req)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Callback to event.
|
|
// Callback to event.
|
|
@@ -807,16 +819,15 @@ func (s *server) leaderLoop() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *server) snapshotLoop() {
|
|
func (s *server) snapshotLoop() {
|
|
|
- s.setState(Snapshotting)
|
|
|
|
|
-
|
|
|
|
|
for s.State() == Snapshotting {
|
|
for s.State() == Snapshotting {
|
|
|
var err error
|
|
var err error
|
|
|
-
|
|
|
|
|
- e := <-s.c
|
|
|
|
|
-
|
|
|
|
|
- if e.target == &stopValue {
|
|
|
|
|
|
|
+ select {
|
|
|
|
|
+ case stop := <-s.stopped:
|
|
|
s.setState(Stopped)
|
|
s.setState(Stopped)
|
|
|
- } else {
|
|
|
|
|
|
|
+ stop <- true
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ case e := <-s.c:
|
|
|
switch req := e.target.(type) {
|
|
switch req := e.target.(type) {
|
|
|
case Command:
|
|
case Command:
|
|
|
err = NotLeaderError
|
|
err = NotLeaderError
|
|
@@ -827,9 +838,9 @@ func (s *server) snapshotLoop() {
|
|
|
case *SnapshotRecoveryRequest:
|
|
case *SnapshotRecoveryRequest:
|
|
|
e.returnValue = s.processSnapshotRecoveryRequest(req)
|
|
e.returnValue = s.processSnapshotRecoveryRequest(req)
|
|
|
}
|
|
}
|
|
|
|
|
+ // Callback to event.
|
|
|
|
|
+ e.c <- err
|
|
|
}
|
|
}
|
|
|
- // Callback to event.
|
|
|
|
|
- e.c <- err
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -891,8 +902,17 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
|
|
|
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
|
|
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Update term and leader.
|
|
|
|
|
- s.setCurrentTerm(req.Term, req.LeaderName, true)
|
|
|
|
|
|
|
+ if req.Term == s.currentTerm {
|
|
|
|
|
+ _assert(s.state != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
|
|
|
|
|
+ // change state to follower
|
|
|
|
|
+ s.state = Follower
|
|
|
|
|
+ // discover new leader when candidate
|
|
|
|
|
+ // save leader name when follower
|
|
|
|
|
+ s.leader = req.LeaderName
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // Update term and leader.
|
|
|
|
|
+ s.updateCurrentTerm(req.Term, req.LeaderName)
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
// Reject if log doesn't contain a matching previous entry.
|
|
// Reject if log doesn't contain a matching previous entry.
|
|
|
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
|
|
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
|
|
@@ -923,7 +943,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
|
|
|
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
|
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
|
|
// If we find a higher term then change to a follower and exit.
|
|
// If we find a higher term then change to a follower and exit.
|
|
|
if resp.Term() > s.Term() {
|
|
if resp.Term() > s.Term() {
|
|
|
- s.setCurrentTerm(resp.Term(), "", false)
|
|
|
|
|
|
|
+ s.updateCurrentTerm(resp.Term(), "")
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -963,6 +983,25 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// processVoteReponse processes a vote request:
|
|
|
|
|
+// 1. if the vote is granted for the current term of the candidate, return true
|
|
|
|
|
+// 2. if the vote is denied due to smaller term, update the term of this server
|
|
|
|
|
+// which will also cause the candidate to step-down, and return false.
|
|
|
|
|
+// 3. if the vote is for a smaller term, ignore it and return false.
|
|
|
|
|
+func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
|
|
|
|
|
+ if resp.VoteGranted && resp.Term == s.currentTerm {
|
|
|
|
|
+ return true
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if resp.Term > s.currentTerm {
|
|
|
|
|
+ s.debugln("server.candidate.vote.failed")
|
|
|
|
|
+ s.updateCurrentTerm(resp.Term, "")
|
|
|
|
|
+ } else {
|
|
|
|
|
+ s.debugln("server.candidate.vote: denied")
|
|
|
|
|
+ }
|
|
|
|
|
+ return false
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
|
// Request Vote
|
|
// Request Vote
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
@@ -985,10 +1024,12 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- s.setCurrentTerm(req.Term, "", false)
|
|
|
|
|
-
|
|
|
|
|
- // If we've already voted for a different candidate then don't vote for this candidate.
|
|
|
|
|
- if s.votedFor != "" && s.votedFor != req.CandidateName {
|
|
|
|
|
|
|
+ // If the term of the request peer is larger than this node, update the term
|
|
|
|
|
+ // If the term is equal and we've already voted for a different candidate then
|
|
|
|
|
+ // don't vote for this candidate.
|
|
|
|
|
+ if req.Term > s.Term() {
|
|
|
|
|
+ s.updateCurrentTerm(req.Term, "")
|
|
|
|
|
+ } else if s.votedFor != "" && s.votedFor != req.CandidateName {
|
|
|
s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
|
|
s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
|
|
|
" already vote for ", s.votedFor)
|
|
" already vote for ", s.votedFor)
|
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
return newRequestVoteResponse(s.currentTerm, false), false
|
|
@@ -1056,7 +1097,15 @@ func (s *server) RemovePeer(name string) error {
|
|
|
|
|
|
|
|
// Stop peer and remove it.
|
|
// Stop peer and remove it.
|
|
|
if s.State() == Leader {
|
|
if s.State() == Leader {
|
|
|
- peer.stopHeartbeat(true)
|
|
|
|
|
|
|
+ // We create a go routine here to avoid potential deadlock.
|
|
|
|
|
+ // We are holding log write lock when reach this line of code.
|
|
|
|
|
+ // Peer.stopHeartbeat can be blocked without go routine, if the
|
|
|
|
|
+ // target go routine (which we want to stop) is calling
|
|
|
|
|
+ // log.getEntriesAfter and waiting for log read lock.
|
|
|
|
|
+ // So we might be holding log lock and waiting for log lock,
|
|
|
|
|
+ // which lead to a deadlock.
|
|
|
|
|
+ // TODO(xiangli) refactor log lock
|
|
|
|
|
+ go peer.stopHeartbeat(true)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
delete(s.peers, name)
|
|
delete(s.peers, name)
|
|
@@ -1075,30 +1124,35 @@ func (s *server) RemovePeer(name string) error {
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
func (s *server) TakeSnapshot() error {
|
|
func (s *server) TakeSnapshot() error {
|
|
|
- // TODO: put a snapshot mutex
|
|
|
|
|
- s.debugln("take Snapshot")
|
|
|
|
|
|
|
+ if s.stateMachine == nil {
|
|
|
|
|
+ return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ // Shortcut without lock
|
|
|
// Exit if the server is currently creating a snapshot.
|
|
// Exit if the server is currently creating a snapshot.
|
|
|
- if s.currentSnapshot != nil {
|
|
|
|
|
- return errors.New("handling snapshot")
|
|
|
|
|
|
|
+ if s.pendingSnapshot != nil {
|
|
|
|
|
+ return errors.New("Snapshot: Last snapshot is not finished.")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Exit if there are no logs yet in the system.
|
|
|
|
|
|
|
+ // TODO: acquire the lock and no more committed is allowed
|
|
|
|
|
+ // This will be done after finishing refactoring heartbeat
|
|
|
|
|
+ s.debugln("take.snapshot")
|
|
|
|
|
+
|
|
|
lastIndex, lastTerm := s.log.commitInfo()
|
|
lastIndex, lastTerm := s.log.commitInfo()
|
|
|
- path := s.SnapshotPath(lastIndex, lastTerm)
|
|
|
|
|
- if lastIndex == 0 {
|
|
|
|
|
- return errors.New("No logs")
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // check if there is log has been committed since the
|
|
|
|
|
+ // last snapshot.
|
|
|
|
|
+ if lastIndex == s.log.startIndex {
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- var state []byte
|
|
|
|
|
- var err error
|
|
|
|
|
- if s.stateMachine != nil {
|
|
|
|
|
- state, err = s.stateMachine.Save()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- state = []byte{0}
|
|
|
|
|
|
|
+ path := s.SnapshotPath(lastIndex, lastTerm)
|
|
|
|
|
+ // Attach snapshot to pending snapshot and save it to disk.
|
|
|
|
|
+ s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
|
|
|
|
|
+
|
|
|
|
|
+ state, err := s.stateMachine.Save()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Clone the list of peers.
|
|
// Clone the list of peers.
|
|
@@ -1108,8 +1162,9 @@ func (s *server) TakeSnapshot() error {
|
|
|
}
|
|
}
|
|
|
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
|
|
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
|
|
|
|
|
|
|
|
- // Attach current snapshot and save it to disk.
|
|
|
|
|
- s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
|
|
|
|
|
|
|
+ // Attach snapshot to pending snapshot and save it to disk.
|
|
|
|
|
+ s.pendingSnapshot.Peers = peers
|
|
|
|
|
+ s.pendingSnapshot.State = state
|
|
|
s.saveSnapshot()
|
|
s.saveSnapshot()
|
|
|
|
|
|
|
|
// We keep some log entries after the snapshot.
|
|
// We keep some log entries after the snapshot.
|
|
@@ -1125,24 +1180,24 @@ func (s *server) TakeSnapshot() error {
|
|
|
|
|
|
|
|
// Retrieves the log path for the server.
|
|
// Retrieves the log path for the server.
|
|
|
func (s *server) saveSnapshot() error {
|
|
func (s *server) saveSnapshot() error {
|
|
|
- if s.currentSnapshot == nil {
|
|
|
|
|
- return errors.New("no snapshot to save")
|
|
|
|
|
|
|
+ if s.pendingSnapshot == nil {
|
|
|
|
|
+ return errors.New("pendingSnapshot.is.nil")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Write snapshot to disk.
|
|
// Write snapshot to disk.
|
|
|
- if err := s.currentSnapshot.save(); err != nil {
|
|
|
|
|
|
|
+ if err := s.pendingSnapshot.save(); err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Swap the current and last snapshots.
|
|
// Swap the current and last snapshots.
|
|
|
- tmp := s.lastSnapshot
|
|
|
|
|
- s.lastSnapshot = s.currentSnapshot
|
|
|
|
|
|
|
+ tmp := s.snapshot
|
|
|
|
|
+ s.snapshot = s.pendingSnapshot
|
|
|
|
|
|
|
|
// Delete the previous snapshot if there is any change
|
|
// Delete the previous snapshot if there is any change
|
|
|
- if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
|
|
|
|
|
|
|
+ if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
|
|
|
tmp.remove()
|
|
tmp.remove()
|
|
|
}
|
|
}
|
|
|
- s.currentSnapshot = nil
|
|
|
|
|
|
|
+ s.pendingSnapshot = nil
|
|
|
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -1183,7 +1238,7 @@ func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *Snapshot
|
|
|
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
|
|
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
|
|
|
// Recover state sent from request.
|
|
// Recover state sent from request.
|
|
|
if err := s.stateMachine.Recovery(req.State); err != nil {
|
|
if err := s.stateMachine.Recovery(req.State); err != nil {
|
|
|
- return newSnapshotRecoveryResponse(req.LastTerm, false, req.LastIndex)
|
|
|
|
|
|
|
+ panic("cannot recover from previous state")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Recover the cluster configuration.
|
|
// Recover the cluster configuration.
|
|
@@ -1197,14 +1252,13 @@ func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
|
|
|
s.log.updateCommitIndex(req.LastIndex)
|
|
s.log.updateCommitIndex(req.LastIndex)
|
|
|
|
|
|
|
|
// Create local snapshot.
|
|
// Create local snapshot.
|
|
|
- s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
|
|
|
|
|
|
|
+ s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
|
|
|
s.saveSnapshot()
|
|
s.saveSnapshot()
|
|
|
|
|
|
|
|
// Clear the previous log entries.
|
|
// Clear the previous log entries.
|
|
|
s.log.compact(req.LastIndex, req.LastTerm)
|
|
s.log.compact(req.LastIndex, req.LastTerm)
|
|
|
|
|
|
|
|
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
|
|
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Load a snapshot at restart
|
|
// Load a snapshot at restart
|
|
@@ -1212,6 +1266,7 @@ func (s *server) LoadSnapshot() error {
|
|
|
// Open snapshot/ directory.
|
|
// Open snapshot/ directory.
|
|
|
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
|
|
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
|
|
+ s.debugln("cannot.open.snapshot: ", err)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1224,7 +1279,8 @@ func (s *server) LoadSnapshot() error {
|
|
|
dir.Close()
|
|
dir.Close()
|
|
|
|
|
|
|
|
if len(filenames) == 0 {
|
|
if len(filenames) == 0 {
|
|
|
- return errors.New("no snapshot")
|
|
|
|
|
|
|
+ s.debugln("no.snapshot.to.load")
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Grab the latest snapshot.
|
|
// Grab the latest snapshot.
|
|
@@ -1244,7 +1300,7 @@ func (s *server) LoadSnapshot() error {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
} else if n != 1 {
|
|
} else if n != 1 {
|
|
|
- return errors.New("Bad snapshot file")
|
|
|
|
|
|
|
+ return errors.New("checksum.err: bad.snapshot.file")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Load remaining snapshot contents.
|
|
// Load remaining snapshot contents.
|
|
@@ -1261,26 +1317,26 @@ func (s *server) LoadSnapshot() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Decode snapshot.
|
|
// Decode snapshot.
|
|
|
- if err = json.Unmarshal(b, &s.lastSnapshot); err != nil {
|
|
|
|
|
- s.debugln("unmarshal error: ", err)
|
|
|
|
|
|
|
+ if err = json.Unmarshal(b, &s.snapshot); err != nil {
|
|
|
|
|
+ s.debugln("unmarshal.snapshot.error: ", err)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Recover snapshot into state machine.
|
|
// Recover snapshot into state machine.
|
|
|
- if err = s.stateMachine.Recovery(s.lastSnapshot.State); err != nil {
|
|
|
|
|
- s.debugln("recovery error: ", err)
|
|
|
|
|
|
|
+ if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
|
|
|
|
|
+ s.debugln("recovery.snapshot.error: ", err)
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Recover cluster configuration.
|
|
// Recover cluster configuration.
|
|
|
- for _, peer := range s.lastSnapshot.Peers {
|
|
|
|
|
|
|
+ for _, peer := range s.snapshot.Peers {
|
|
|
s.AddPeer(peer.Name, peer.ConnectionString)
|
|
s.AddPeer(peer.Name, peer.ConnectionString)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Update log state.
|
|
// Update log state.
|
|
|
- s.log.startTerm = s.lastSnapshot.LastTerm
|
|
|
|
|
- s.log.startIndex = s.lastSnapshot.LastIndex
|
|
|
|
|
- s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
|
|
|
|
|
|
|
+ s.log.startTerm = s.snapshot.LastTerm
|
|
|
|
|
+ s.log.startIndex = s.snapshot.LastIndex
|
|
|
|
|
+ s.log.updateCommitIndex(s.snapshot.LastIndex)
|
|
|
|
|
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -1289,6 +1345,14 @@ func (s *server) LoadSnapshot() error {
|
|
|
// Config File
|
|
// Config File
|
|
|
//--------------------------------------
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
|
|
+// Flushes commit index to the disk.
|
|
|
|
|
+// So when the raft server restarts, it will commit upto the flushed commitIndex.
|
|
|
|
|
+func (s *server) FlushCommitIndex() {
|
|
|
|
|
+ s.debugln("server.conf.update")
|
|
|
|
|
+ // Write the configuration to file.
|
|
|
|
|
+ s.writeConf()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (s *server) writeConf() {
|
|
func (s *server) writeConf() {
|
|
|
|
|
|
|
|
peers := make([]*Peer, len(s.peers))
|
|
peers := make([]*Peer, len(s.peers))
|