|
|
@@ -140,7 +140,6 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|
|
go func() {
|
|
|
defer r.onStop()
|
|
|
islead := false
|
|
|
- isCandidate := false
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
@@ -163,7 +162,6 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|
|
|
|
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
|
|
islead = rd.RaftState == raft.StateLeader
|
|
|
- isCandidate = rd.RaftState == raft.StateCandidate
|
|
|
rh.updateLeadership()
|
|
|
}
|
|
|
|
|
|
@@ -197,7 +195,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|
|
// For more details, check raft thesis 10.2.1
|
|
|
if islead {
|
|
|
// gofail: var raftBeforeLeaderSend struct{}
|
|
|
- r.sendMessages(rd.Messages)
|
|
|
+ r.transport.Send(r.processMessages(rd.Messages))
|
|
|
}
|
|
|
|
|
|
// gofail: var raftBeforeSave struct{}
|
|
|
@@ -223,21 +221,38 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|
|
r.raftStorage.Append(rd.Entries)
|
|
|
|
|
|
if !islead {
|
|
|
- // gofail: var raftBeforeFollowerSend struct{}
|
|
|
- r.sendMessages(rd.Messages)
|
|
|
- }
|
|
|
- raftDone <- struct{}{}
|
|
|
+ // finish processing incoming messages before we signal raftdone chan
|
|
|
+ msgs := r.processMessages(rd.Messages)
|
|
|
|
|
|
- r.Advance()
|
|
|
+ // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
|
|
|
+ raftDone <- struct{}{}
|
|
|
|
|
|
- if isCandidate {
|
|
|
- // candidate needs to wait for all pending configuration changes to be applied
|
|
|
- // before continue. Or we might incorrectly count the number of votes (e.g. receive vote from
|
|
|
- // a removed member).
|
|
|
+ // Candidate or follower needs to wait for all pending configuration
|
|
|
+ // changes to be applied before sending messages.
|
|
|
+ // Otherwise we might incorrectly count votes (e.g. votes from removed members).
|
|
|
+ // Also slow machine's follower raft-layer could proceed to become the leader
|
|
|
+ // on its own single-node cluster, before apply-layer applies the config change.
|
|
|
// We simply wait for ALL pending entries to be applied for now.
|
|
|
// We might improve this later on if it causes unnecessary long blocking issues.
|
|
|
- rh.waitForApply()
|
|
|
+ waitApply := false
|
|
|
+ for _, ent := range rd.CommittedEntries {
|
|
|
+ if ent.Type == raftpb.EntryConfChange {
|
|
|
+ waitApply = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if waitApply {
|
|
|
+ rh.waitForApply()
|
|
|
+ }
|
|
|
+
|
|
|
+ // gofail: var raftBeforeFollowerSend struct{}
|
|
|
+ r.transport.Send(msgs)
|
|
|
+ } else {
|
|
|
+ // leader already processed 'MsgSnap' and signaled
|
|
|
+ raftDone <- struct{}{}
|
|
|
}
|
|
|
+
|
|
|
+ r.Advance()
|
|
|
case <-r.stopped:
|
|
|
return
|
|
|
}
|
|
|
@@ -258,7 +273,7 @@ func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (r *raftNode) sendMessages(ms []raftpb.Message) {
|
|
|
+func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
|
|
sentAppResp := false
|
|
|
for i := len(ms) - 1; i >= 0; i-- {
|
|
|
if r.isIDRemoved(ms[i].To) {
|
|
|
@@ -294,8 +309,7 @@ func (r *raftNode) sendMessages(ms []raftpb.Message) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- r.transport.Send(ms)
|
|
|
+ return ms
|
|
|
}
|
|
|
|
|
|
func (r *raftNode) apply() chan apply {
|