Browse Source

raft: leader waits for the reply of previous message when follower is not in good path.

It is reasonable for the leader to wait for the reply before sending out the next
msgApp or msgSnap for the follower in bad path. Or the leader will send out useless
messages if the previous message is rejected or the previous message is a snapshot.
Especially for the snapshot case, the leader will be 100% to send out duplicate message
including the snapshot, which is a huge waste.

This commit implement a timeout based wait mechanism. The timeout for normal msgApp is a
heartbeatTimeout and the timeout for snapshot is electionTimeout(snapshot is larger). We
can implement a piggyback mechanism(application notifies the msg lost) in the future
if necessary.
Xiang Li 11 years ago
parent
commit
88767d913d
2 changed files with 117 additions and 15 deletions
  1. 30 2
      raft/raft.go
  2. 87 13
      raft/raft_test.go

+ 30 - 2
raft/raft.go

@@ -56,9 +56,13 @@ func (st StateType) MarshalJSON() ([]byte, error) {
 	return []byte(fmt.Sprintf("%q", st.String())), nil
 	return []byte(fmt.Sprintf("%q", st.String())), nil
 }
 }
 
 
-type progress struct{ match, next uint64 }
+type progress struct {
+	match, next uint64
+	wait        int
+}
 
 
 func (pr *progress) update(n uint64) {
 func (pr *progress) update(n uint64) {
+	pr.waitReset()
 	if pr.match < n {
 	if pr.match < n {
 		pr.match = n
 		pr.match = n
 	}
 	}
@@ -72,6 +76,7 @@ func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // Otherwise it decreases the progress next index and returns true.
 // Otherwise it decreases the progress next index and returns true.
 func (pr *progress) maybeDecrTo(to uint64) bool {
 func (pr *progress) maybeDecrTo(to uint64) bool {
+	pr.waitReset()
 	if pr.match != 0 {
 	if pr.match != 0 {
 		// the rejection must be stale if the progress has matched and "to"
 		// the rejection must be stale if the progress has matched and "to"
 		// is smaller than "match".
 		// is smaller than "match".
@@ -94,7 +99,19 @@ func (pr *progress) maybeDecrTo(to uint64) bool {
 	return true
 	return true
 }
 }
 
 
-func (pr *progress) String() string { return fmt.Sprintf("next = %d, match = %d", pr.next, pr.match) }
+func (pr *progress) waitDecr(i int) {
+	pr.wait -= i
+	if pr.wait < 0 {
+		pr.wait = 0
+	}
+}
+func (pr *progress) waitSet(w int)    { pr.wait = w }
+func (pr *progress) waitReset()       { pr.wait = 0 }
+func (pr *progress) shouldWait() bool { return pr.match == 0 && pr.wait > 0 }
+
+func (pr *progress) String() string {
+	return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.next, pr.match, pr.wait)
+}
 
 
 type raft struct {
 type raft struct {
 	pb.HardState
 	pb.HardState
@@ -203,6 +220,10 @@ func (r *raft) send(m pb.Message) {
 // sendAppend sends RRPC, with entries to the given peer.
 // sendAppend sends RRPC, with entries to the given peer.
 func (r *raft) sendAppend(to uint64) {
 func (r *raft) sendAppend(to uint64) {
 	pr := r.prs[to]
 	pr := r.prs[to]
+	if pr.shouldWait() {
+		log.Printf("raft: %x ignored sending %s to %x [%s]", r.id, pb.MsgApp, to, pr)
+		return
+	}
 	m := pb.Message{}
 	m := pb.Message{}
 	m.To = to
 	m.To = to
 	if r.needSnapshot(pr.next) {
 	if r.needSnapshot(pr.next) {
@@ -218,6 +239,7 @@ func (r *raft) sendAppend(to uint64) {
 		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
 		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
 		log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
 		log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
 			r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
 			r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
+		pr.waitSet(r.electionTimeout)
 	} else {
 	} else {
 		m.Type = pb.MsgApp
 		m.Type = pb.MsgApp
 		m.Index = pr.next - 1
 		m.Index = pr.next - 1
@@ -228,6 +250,11 @@ func (r *raft) sendAppend(to uint64) {
 		// has been matched.
 		// has been matched.
 		if n := len(m.Entries); pr.match != 0 && n != 0 {
 		if n := len(m.Entries); pr.match != 0 && n != 0 {
 			pr.optimisticUpdate(m.Entries[n-1].Index)
 			pr.optimisticUpdate(m.Entries[n-1].Index)
+		} else if pr.match == 0 {
+			// TODO (xiangli): better way to find out if the follwer is in good path or not
+			// a follower might be in bad path even if match != 0, since we optmistically
+			// increase the next.
+			pr.waitSet(r.heartbeatTimeout)
 		}
 		}
 	}
 	}
 	r.send(m)
 	r.send(m)
@@ -268,6 +295,7 @@ func (r *raft) bcastHeartbeat() {
 			continue
 			continue
 		}
 		}
 		r.sendHeartbeat(i)
 		r.sendHeartbeat(i)
+		r.prs[i].waitDecr(r.heartbeatTimeout)
 	}
 	}
 }
 }
 
 

+ 87 - 13
raft/raft_test.go

@@ -70,10 +70,10 @@ func TestProgressUpdate(t *testing.T) {
 		}
 		}
 		p.update(tt.update)
 		p.update(tt.update)
 		if p.match != tt.wm {
 		if p.match != tt.wm {
-			t.Errorf("#%d: match=%d, want %d", i, p.match, tt.wm)
+			t.Errorf("#%d: match= %d, want %d", i, p.match, tt.wm)
 		}
 		}
 		if p.next != tt.wn {
 		if p.next != tt.wn {
-			t.Errorf("#%d: next=%d, want %d", i, p.next, tt.wn)
+			t.Errorf("#%d: next= %d, want %d", i, p.next, tt.wn)
 		}
 		}
 	}
 	}
 }
 }
@@ -132,17 +132,85 @@ func TestProgressMaybeDecr(t *testing.T) {
 			next:  tt.n,
 			next:  tt.n,
 		}
 		}
 		if g := p.maybeDecrTo(tt.to); g != tt.w {
 		if g := p.maybeDecrTo(tt.to); g != tt.w {
-			t.Errorf("#%d: maybeDecrTo=%t, want %t", i, g, tt.w)
+			t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
 		}
 		}
 		if gm := p.match; gm != tt.m {
 		if gm := p.match; gm != tt.m {
-			t.Errorf("#%d: match=%d, want %d", i, gm, tt.m)
+			t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
 		}
 		}
 		if gn := p.next; gn != tt.wn {
 		if gn := p.next; gn != tt.wn {
-			t.Errorf("#%d: next=%d, want %d", i, gn, tt.wn)
+			t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
 		}
 		}
 	}
 	}
 }
 }
 
 
+func TestProgressShouldWait(t *testing.T) {
+	tests := []struct {
+		m    uint64
+		wait int
+
+		w bool
+	}{
+		// match != 0 is always not wait
+		{1, 0, false},
+		{1, 1, false},
+		{0, 1, true},
+		{0, 0, false},
+	}
+	for i, tt := range tests {
+		p := &progress{
+			match: tt.m,
+			wait:  tt.wait,
+		}
+		if g := p.shouldWait(); g != tt.w {
+			t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
+		}
+	}
+}
+
+// TestProgressWaitReset ensures that progress.Update and progress.DercTo
+// will reset progress.wait.
+func TestProgressWaitReset(t *testing.T) {
+	p := &progress{
+		wait: 1,
+	}
+	p.maybeDecrTo(1)
+	if p.wait != 0 {
+		t.Errorf("wait= %d, want 0", p.wait)
+	}
+	p.wait = 1
+	p.update(2)
+	if p.wait != 0 {
+		t.Errorf("wait= %d, want 0", p.wait)
+	}
+}
+
+// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
+func TestProgressDecr(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.prs[2].wait = r.heartbeatTimeout * 2
+
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
+	if r.prs[2].wait != r.heartbeatTimeout*(2-1) {
+		t.Errorf("wait = %d, want %d", r.prs[2].wait, r.heartbeatTimeout*(2-1))
+	}
+}
+
+func TestProgressWait(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+
+	ms := r.readMessages()
+	if len(ms) != 1 {
+		t.Errorf("len(ms) = %d, want 1", len(ms))
+	}
+}
+
 func TestLeaderElection(t *testing.T) {
 func TestLeaderElection(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		*network
 		*network
@@ -269,7 +337,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	// avoid committing ChangeTerm proposal
 	// avoid committing ChangeTerm proposal
 	tt.ignore(pb.MsgApp)
 	tt.ignore(pb.MsgApp)
 
 
-	// elect 1 as the new leader with term 2
+	// elect 2 as the new leader with term 2
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
 
 
 	// no log entries from previous term should be committed
 	// no log entries from previous term should be committed
@@ -279,10 +347,11 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	}
 	}
 
 
 	tt.recover()
 	tt.recover()
-
-	// still be able to append a entry
+	// send heartbeat; reset wait
+	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
+	// append an entry at current term
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
-
+	// expect the committed to be advanced
 	if sm.raftLog.committed != 5 {
 	if sm.raftLog.committed != 5 {
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
 		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
 	}
 	}
@@ -378,6 +447,8 @@ func TestCandidateConcede(t *testing.T) {
 
 
 	// heal the partition
 	// heal the partition
 	tt.recover()
 	tt.recover()
+	// send heartbeat; reset wait
+	tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
 
 
 	data := []byte("force follower")
 	data := []byte("force follower")
 	// send a proposal to 2 to flush out a MsgApp to 0
 	// send a proposal to 2 to flush out a MsgApp to 0
@@ -425,18 +496,21 @@ func TestOldMessages(t *testing.T) {
 	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
 	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
-	// pretend we're an old leader trying to make progress
-	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
+	// pretend we're an old leader trying to make progress; this entry is expected to be ignored.
+	tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}})
+	// commit a new entry
+	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 
 	l := &raftLog{
 	l := &raftLog{
 		storage: &MemoryStorage{
 		storage: &MemoryStorage{
 			ents: []pb.Entry{
 			ents: []pb.Entry{
 				{}, {Data: nil, Term: 1, Index: 1},
 				{}, {Data: nil, Term: 1, Index: 1},
 				{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
 				{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
+				{Data: []byte("somedata"), Term: 3, Index: 4},
 			},
 			},
 		},
 		},
-		unstable:  unstable{offset: 4},
-		committed: 3,
+		unstable:  unstable{offset: 5},
+		committed: 4,
 	}
 	}
 	base := ltoa(l)
 	base := ltoa(l)
 	for i, p := range tt.peers {
 	for i, p := range tt.peers {