Browse Source

Merge pull request #1737 from xiang90/include_commit

raft: include commitIndex in heartbeat
Xiang Li 11 years ago
parent
commit
1635844ebd
2 changed files with 23 additions and 5 deletions
  1. 10 2
      raft/raft.go
  2. 13 3
      raft/raft_test.go

+ 10 - 2
raft/raft.go

@@ -202,9 +202,17 @@ func (r *raft) sendAppend(to uint64) {
 
 // sendHeartbeat sends an empty MsgApp
 func (r *raft) sendHeartbeat(to uint64) {
+	// Attach the commit as min(to.matched, r.committed).
+	// When the leader sends out heartbeat message,
+	// the receiver(follower) might not be matched with the leader
+	// or it might not have all the committed entries.
+	// The leader MUST NOT forward the follower's commit to
+	// an unmatched index.
+	commit := min(r.prs[to].match, r.raftLog.committed)
 	m := pb.Message{
-		To:   to,
-		Type: pb.MsgApp,
+		To:     to,
+		Type:   pb.MsgApp,
+		Commit: commit,
 	}
 	r.send(m)
 }

+ 13 - 3
raft/raft_test.go

@@ -911,13 +911,20 @@ func TestBcastBeat(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		sm.appendEntry(pb.Entry{})
 	}
+	// slow follower
+	sm.prs[2].match, sm.prs[2].next = 5, 6
+	// normal follower
+	sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
 
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	msgs := sm.readMessages()
 	if len(msgs) != 2 {
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 	}
-	tomap := map[uint64]bool{2: true, 3: true}
+	wantCommitMap := map[uint64]uint64{
+		2: min(sm.raftLog.committed, sm.prs[2].match),
+		3: min(sm.raftLog.committed, sm.prs[3].match),
+	}
 	for i, m := range msgs {
 		if m.Type != pb.MsgApp {
 			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
@@ -928,10 +935,13 @@ func TestBcastBeat(t *testing.T) {
 		if m.LogTerm != 0 {
 			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
 		}
-		if !tomap[m.To] {
+		if wantCommitMap[m.To] == 0 {
 			t.Fatalf("#%d: unexpected to %d", i, m.To)
 		} else {
-			delete(tomap, m.To)
+			if m.Commit != wantCommitMap[m.To] {
+				t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
+			}
+			delete(wantCommitMap, m.To)
 		}
 		if len(m.Entries) != 0 {
 			t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))