Browse Source

raft: optimistically increase the next if the follower is already matched

This is useful since we want to pipeline the appendEntry requests. Without
enabling optimistic increasing, the second pipelining appendEntry request
will include the entries the first one has already sent out. We decrease
the next directly to match if the leader receives a rejection for a matched
follower. This happens if one pipelining request get lost and following ones
arrives at the follower.
Xiang Li 11 years ago
parent
commit
4c1fd07311
2 changed files with 67 additions and 7 deletions
  1. 22 3
      raft/raft.go
  2. 45 4
      raft/raft_test.go

+ 22 - 3
raft/raft.go

@@ -63,12 +63,26 @@ func (pr *progress) update(n uint64) {
 	pr.next = n + 1
 }
 
+func (pr *progress) optimisticUpdate(n uint64) {
+	pr.next = n + 1
+}
+
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // Otherwise it decreases the progress next index and returns true.
 func (pr *progress) maybeDecrTo(to uint64) bool {
-	// the rejection must be stale if the progress has matched with
-	// follower or "to" does not match next - 1
-	if pr.match != 0 || pr.next-1 != to {
+	if pr.match != 0 {
+		// the rejection must be stale if the progress has matched and "to"
+		// is smaller than "match".
+		if to <= pr.match {
+			return false
+		}
+		// directly decrease next to match + 1
+		pr.next = pr.match + 1
+		return true
+	}
+
+	// the rejection must be stale if "to" does not match next - 1
+	if pr.next-1 != to {
 		return false
 	}
 
@@ -196,6 +210,11 @@ func (r *raft) sendAppend(to uint64) {
 		m.LogTerm = r.raftLog.term(pr.next - 1)
 		m.Entries = r.raftLog.entries(pr.next)
 		m.Commit = r.raftLog.committed
+		// optimistically increase the next if the follower
+		// has been matched.
+		if n := len(m.Entries); pr.match != 0 && n != 0 {
+			pr.optimisticUpdate(m.Entries[n-1].Index)
+		}
 	}
 	r.send(m)
 }

+ 45 - 4
raft/raft_test.go

@@ -61,8 +61,18 @@ func TestProgressMaybeDecr(t *testing.T) {
 			1, 0, 0, false, 0,
 		},
 		{
-			// match != 0 is always false
-			5, 10, 9, false, 10,
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 5, false, 10,
+		},
+		{
+			// match != 0 and to is greater than match
+			// directly decrease to match+1
+			5, 10, 4, false, 10,
+		},
+		{
+			// match != 0 and to is not greater than match
+			5, 10, 9, true, 6,
 		},
 		{
 			// next-1 != to is always false
@@ -840,7 +850,7 @@ func TestAllServerStepdown(t *testing.T) {
 }
 
 func TestLeaderAppResp(t *testing.T) {
-	// initial progress: match = 0; netx = 3
+	// initial progress: match = 0; next = 3
 	tests := []struct {
 		index  uint64
 		reject bool
@@ -854,7 +864,7 @@ func TestLeaderAppResp(t *testing.T) {
 	}{
 		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
 		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
-		{2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
+		{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
 		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
 	}
 
@@ -988,6 +998,37 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 }
 
+func TestLeaderIncreaseNext(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	tests := []struct {
+		// progress
+		match uint64
+		next  uint64
+
+		wnext uint64
+	}{
+		// match is not zero, optimistically increase next
+		// previous entries + noop entry + propose + 1
+		{1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
+		// match is zero, not optimistically increase next
+		{0, 2, 2},
+	}
+
+	for i, tt := range tests {
+		sm := newRaft(1, []uint64{1, 2}, 10, 1)
+		sm.raftLog.append(0, previousEnts...)
+		sm.becomeCandidate()
+		sm.becomeLeader()
+		sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
+		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+
+		p := sm.prs[2]
+		if p.next != tt.wnext {
+			t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
+		}
+	}
+}
+
 func TestRestore(t *testing.T) {
 	s := pb.Snapshot{
 		Index: 11, // magic number