|
@@ -138,10 +138,11 @@ func (c *Config) validate() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type raft struct {
|
|
type raft struct {
|
|
|
- pb.HardState
|
|
|
|
|
-
|
|
|
|
|
id uint64
|
|
id uint64
|
|
|
|
|
|
|
|
|
|
+ Term uint64
|
|
|
|
|
+ Vote uint64
|
|
|
|
|
+
|
|
|
// the log
|
|
// the log
|
|
|
raftLog *raftLog
|
|
raftLog *raftLog
|
|
|
|
|
|
|
@@ -239,6 +240,14 @@ func (r *raft) hasLeader() bool { return r.lead != None }
|
|
|
|
|
|
|
|
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
|
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
|
|
|
|
|
|
|
|
|
+func (r *raft) hardState() pb.HardState {
|
|
|
|
|
+ return pb.HardState{
|
|
|
|
|
+ Term: r.Term,
|
|
|
|
|
+ Vote: r.Vote,
|
|
|
|
|
+ Commit: r.raftLog.committed,
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
|
|
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
|
|
|
|
|
|
|
|
func (r *raft) nodes() []uint64 {
|
|
func (r *raft) nodes() []uint64 {
|
|
@@ -295,7 +304,7 @@ func (r *raft) sendAppend(to uint64) {
|
|
|
m.Snapshot = snapshot
|
|
m.Snapshot = snapshot
|
|
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
|
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
|
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
|
|
- r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
|
|
|
|
|
|
+ r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
|
|
|
pr.becomeSnapshot(sindex)
|
|
pr.becomeSnapshot(sindex)
|
|
|
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
|
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
|
|
} else {
|
|
} else {
|
|
@@ -525,7 +534,6 @@ func (r *raft) Step(m pb.Message) error {
|
|
|
if r.state != StateLeader {
|
|
if r.state != StateLeader {
|
|
|
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
|
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
|
|
r.campaign()
|
|
r.campaign()
|
|
|
- r.Commit = r.raftLog.committed
|
|
|
|
|
} else {
|
|
} else {
|
|
|
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
|
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
|
|
}
|
|
}
|
|
@@ -550,7 +558,6 @@ func (r *raft) Step(m pb.Message) error {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
r.step(r, m)
|
|
r.step(r, m)
|
|
|
- r.Commit = r.raftLog.committed
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -742,8 +749,8 @@ func stepFollower(r *raft, m pb.Message) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (r *raft) handleAppendEntries(m pb.Message) {
|
|
func (r *raft) handleAppendEntries(m pb.Message) {
|
|
|
- if m.Index < r.Commit {
|
|
|
|
|
- r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.Commit})
|
|
|
|
|
|
|
+ if m.Index < r.raftLog.committed {
|
|
|
|
|
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -765,11 +772,11 @@ func (r *raft) handleSnapshot(m pb.Message) {
|
|
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
|
|
if r.restore(m.Snapshot) {
|
|
if r.restore(m.Snapshot) {
|
|
|
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
|
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
|
|
- r.id, r.Commit, sindex, sterm)
|
|
|
|
|
|
|
+ r.id, r.raftLog.committed, sindex, sterm)
|
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
|
|
} else {
|
|
} else {
|
|
|
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
|
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
|
|
- r.id, r.Commit, sindex, sterm)
|
|
|
|
|
|
|
+ r.id, r.raftLog.committed, sindex, sterm)
|
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -782,13 +789,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|
|
}
|
|
}
|
|
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
|
|
- r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
|
|
|
|
|
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
|
r.raftLog.commitTo(s.Metadata.Index)
|
|
r.raftLog.commitTo(s.Metadata.Index)
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
|
|
- r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
|
|
|
|
|
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
|
|
|
|
|
|
|
r.raftLog.restore(s)
|
|
r.raftLog.restore(s)
|
|
|
r.prs = make(map[uint64]*Progress)
|
|
r.prs = make(map[uint64]*Progress)
|
|
@@ -848,7 +855,6 @@ func (r *raft) loadState(state pb.HardState) {
|
|
|
r.raftLog.committed = state.Commit
|
|
r.raftLog.committed = state.Commit
|
|
|
r.Term = state.Term
|
|
r.Term = state.Term
|
|
|
r.Vote = state.Vote
|
|
r.Vote = state.Vote
|
|
|
- r.Commit = state.Commit
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// isElectionTimeout returns true if r.elapsed is greater than the
|
|
// isElectionTimeout returns true if r.elapsed is greater than the
|