Ver código fonte

raft: do not decrease nextIndex and send entries for stale reply

Xiang Li 11 anos atrás
pai
commit
16ba77767e
2 arquivos alterados com 17 adições e 7 exclusões
  1. 13 4
      raft/raft.go
  2. 4 3
      raft/raft_test.go

+ 13 - 4
raft/raft.go

@@ -69,10 +69,18 @@ func (pr *progress) update(n int64) {
 	pr.next = n + 1
 }
 
-func (pr *progress) decr() {
+func (pr *progress) maybeDecrTo(to int64) bool {
+	// the rejection must be stale if the
+	// progress has matched with follower
+	// or "to" does not match next - 1
+	if pr.match != 0 || pr.next-1 != to {
+		return false
+	}
+
 	if pr.next--; pr.next < 1 {
 		pr.next = 1
 	}
+	return true
 }
 
 func (pr *progress) String() string {
@@ -392,7 +400,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 	if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
 		r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()})
 	} else {
-		r.send(pb.Message{To: m.From, Type: msgAppResp, Reject: true})
+		r.send(pb.Message{To: m.From, Type: msgAppResp, Index: m.Index, Reject: true})
 	}
 }
 
@@ -436,8 +444,9 @@ func stepLeader(r *raft, m pb.Message) {
 		r.bcastAppend()
 	case msgAppResp:
 		if m.Reject {
-			r.prs[m.From].decr()
-			r.sendAppend(m.From)
+			if r.prs[m.From].maybeDecrTo(m.Index) {
+				r.sendAppend(m.From)
+			}
 		} else {
 			r.prs[m.From].update(m.Index)
 			if r.maybeCommit() {

+ 4 - 3
raft/raft_test.go

@@ -690,8 +690,9 @@ func TestLeaderAppResp(t *testing.T) {
 		windex     int64
 		wcommitted int64
 	}{
-		{-1, true, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
-		{2, false, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
+		{3, true, 0, 0, 0},  // stale resp; no replies
+		{2, true, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
+		{2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
 	}
 
 	for i, tt := range tests {
@@ -857,7 +858,7 @@ func TestProvideSnap(t *testing.T) {
 	// node 1 needs a snapshot
 	sm.prs[2].next = sm.raftLog.offset
 
-	sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Reject: true})
+	sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: sm.prs[2].next - 1, Reject: true})
 	msgs := sm.ReadMessages()
 	if len(msgs) != 1 {
 		t.Fatalf("len(msgs) = %d, want 1", len(msgs))