Browse Source

Merge pull request #1744 from xiang90/next

raft: optimistically increase the next if the follower is already matched
Xiang Li 11 years ago
parent
commit
b50f331558
2 changed files with 67 additions and 7 deletions
  1. 22 3
      raft/raft.go
  2. 45 4
      raft/raft_test.go

+ 22 - 3
raft/raft.go

@@ -63,12 +63,26 @@ func (pr *progress) update(n uint64) {
 	pr.next = n + 1
 }
 
+func (pr *progress) optimisticUpdate(n uint64) {
+	pr.next = n + 1
+}
+
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // Otherwise it decreases the progress next index and returns true.
 func (pr *progress) maybeDecrTo(to uint64) bool {
-	// the rejection must be stale if the progress has matched with
-	// follower or "to" does not match next - 1
-	if pr.match != 0 || pr.next-1 != to {
+	if pr.match != 0 {
+		// the rejection must be stale if the progress has matched and "to"
+		// is smaller than "match".
+		if to <= pr.match {
+			return false
+		}
+		// directly decrease next to match + 1
+		pr.next = pr.match + 1
+		return true
+	}
+
+	// the rejection must be stale if "to" does not match next - 1
+	if pr.next-1 != to {
 		return false
 	}
 
@@ -196,6 +210,11 @@ func (r *raft) sendAppend(to uint64) {
 		m.LogTerm = r.raftLog.term(pr.next - 1)
 		m.Entries = r.raftLog.entries(pr.next)
 		m.Commit = r.raftLog.committed
+		// optimistically increase the next if the follower
+		// has been matched.
+		if n := len(m.Entries); pr.match != 0 && n != 0 {
+			pr.optimisticUpdate(m.Entries[n-1].Index)
+		}
 	}
 	r.send(m)
 }

+ 45 - 4
raft/raft_test.go

@@ -61,8 +61,18 @@ func TestProgressMaybeDecr(t *testing.T) {
 			1, 0, 0, false, 0,
 		},
 		{
-			// match != 0 is always false
-			5, 10, 9, false, 10,
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 5, false, 10,
+		},
+		{
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 4, false, 10,
+		},
+		{
+			// match != 0 and to is not greater than match
+			5, 10, 9, true, 6,
 		},
 		{
 			// next-1 != to is always false
@@ -869,7 +879,7 @@ func TestAllServerStepdown(t *testing.T) {
 }
 
 func TestLeaderAppResp(t *testing.T) {
-	// initial progress: match = 0; netx = 3
+	// initial progress: match = 0; next = 3
 	tests := []struct {
 		index  uint64
 		reject bool
@@ -883,7 +893,7 @@ func TestLeaderAppResp(t *testing.T) {
 	}{
 		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
 		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
-		{2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
+		{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
 		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
 	}
 
@@ -1017,6 +1027,37 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 }
 
+func TestLeaderIncreaseNext(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	tests := []struct {
+		// progress
+		match uint64
+		next  uint64
+
+		wnext uint64
+	}{
+		// match is not zero, optimistically increase next
+		// previous entries + noop entry + propose + 1
+		{1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
+		// match is zero, not optimistically increase next
+		{0, 2, 2},
+	}
+
+	for i, tt := range tests {
+		sm := newRaft(1, []uint64{1, 2}, 10, 1)
+		sm.raftLog.append(0, previousEnts...)
+		sm.becomeCandidate()
+		sm.becomeLeader()
+		sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
+		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+
+		p := sm.prs[2]
+		if p.next != tt.wnext {
+			t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
+		}
+	}
+}
+
 func TestRestore(t *testing.T) {
 	s := pb.Snapshot{
 		Index: 11, // magic number