Browse Source

Merge pull request #1207 from coreos/check_stale_resp

raft: do not decrease nextIndex and send entries for stale reply
Xiang Li 11 years ago
parent
commit
5b291b521b
2 changed files with 19 additions and 7 deletions
  1. 15 4
      raft/raft.go
  2. 4 3
      raft/raft_test.go

+ 15 - 4
raft/raft.go

@@ -69,10 +69,20 @@ func (pr *progress) update(n int64) {
 	pr.next = n + 1
 	pr.next = n + 1
 }
 }
 
 
-func (pr *progress) decr() {
+// maybeDecrTo returns false if the given to index comes from an out of order message.
+// Otherwise it decreases the progress next index and returns true.
+func (pr *progress) maybeDecrTo(to int64) bool {
+	// the rejection must be stale if the
+	// progress has matched with follower
+	// or "to" does not match next - 1
+	if pr.match != 0 || pr.next-1 != to {
+		return false
+	}
+
 	if pr.next--; pr.next < 1 {
 	if pr.next--; pr.next < 1 {
 		pr.next = 1
 		pr.next = 1
 	}
 	}
+	return true
 }
 }
 
 
 func (pr *progress) String() string {
 func (pr *progress) String() string {
@@ -392,7 +402,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 	if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
 	if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
 		r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()})
 		r.send(pb.Message{To: m.From, Type: msgAppResp, Index: r.raftLog.lastIndex()})
 	} else {
 	} else {
-		r.send(pb.Message{To: m.From, Type: msgAppResp, Reject: true})
+		r.send(pb.Message{To: m.From, Type: msgAppResp, Index: m.Index, Reject: true})
 	}
 	}
 }
 }
 
 
@@ -436,8 +446,9 @@ func stepLeader(r *raft, m pb.Message) {
 		r.bcastAppend()
 		r.bcastAppend()
 	case msgAppResp:
 	case msgAppResp:
 		if m.Reject {
 		if m.Reject {
-			r.prs[m.From].decr()
-			r.sendAppend(m.From)
+			if r.prs[m.From].maybeDecrTo(m.Index) {
+				r.sendAppend(m.From)
+			}
 		} else {
 		} else {
 			r.prs[m.From].update(m.Index)
 			r.prs[m.From].update(m.Index)
 			if r.maybeCommit() {
 			if r.maybeCommit() {

+ 4 - 3
raft/raft_test.go

@@ -690,8 +690,9 @@ func TestLeaderAppResp(t *testing.T) {
 		windex     int64
 		windex     int64
 		wcommitted int64
 		wcommitted int64
 	}{
 	}{
-		{-1, true, 1, 1, 0}, // bad resp; leader does not commit; reply with log entries
-		{2, false, 2, 2, 2}, // good resp; leader commits; broadcast with commit index
+		{3, true, 0, 0, 0},  // stale resp; no replies
+		{2, true, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
+		{2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -857,7 +858,7 @@ func TestProvideSnap(t *testing.T) {
 	// node 1 needs a snapshot
 	// node 1 needs a snapshot
 	sm.prs[2].next = sm.raftLog.offset
 	sm.prs[2].next = sm.raftLog.offset
 
 
-	sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Reject: true})
+	sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: sm.prs[2].next - 1, Reject: true})
 	msgs := sm.ReadMessages()
 	msgs := sm.ReadMessages()
 	if len(msgs) != 1 {
 	if len(msgs) != 1 {
 		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
 		t.Fatalf("len(msgs) = %d, want 1", len(msgs))