Browse Source

Merge pull request #1200 from coreos/raft_heartbeat

raft: heartbeat is only response for maintaining leader dominance
Xiang Li 11 years ago
parent
commit
ce70e63cc6
2 changed files with 27 additions and 66 deletions
  1. 3 8
      raft/raft.go
  2. 24 58
      raft/raft_test.go

+ 3 - 8
raft/raft.go

@@ -190,16 +190,11 @@ func (r *raft) sendAppend(to int64) {
 	r.send(m)
 }
 
-// sendHeartbeat sends RRPC, without entries to the given peer.
+// sendHeartbeat sends an empty msgApp
 func (r *raft) sendHeartbeat(to int64) {
-	pr := r.prs[to]
-	index := max(pr.next-1, r.raftLog.offset)
 	m := pb.Message{
-		To:      to,
-		Type:    msgApp,
-		Index:   index,
-		LogTerm: r.raftLog.term(index),
-		Commit:  r.raftLog.committed,
+		To:   to,
+		Type: msgApp,
 	}
 	r.send(m)
 }

+ 24 - 58
raft/raft_test.go

@@ -160,15 +160,6 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 
 	tt.recover()
 
-	// send out a heartbeat
-	// after append a ChangeTerm entry from the current term, all entries
-	// should be committed
-	tt.send(pb.Message{From: 2, To: 2, Type: msgBeat})
-
-	if sm.raftLog.committed != 4 {
-		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
-	}
-
 	// still be able to append a entry
 	tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
 
@@ -729,17 +720,16 @@ func TestLeaderAppResp(t *testing.T) {
 }
 
 // When the leader receives a heartbeat tick, it should
-// send a msgApp with m.Index = max(progress.next-1,log.offset) and empty
-// entries.
+// send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
 func TestBcastBeat(t *testing.T) {
 	offset := int64(1000)
 	// make a state machine with log.offset = 1000
 	s := pb.Snapshot{
 		Index: offset,
 		Term:  1,
-		Nodes: []int64{1, 2},
+		Nodes: []int64{1, 2, 3},
 	}
-	sm := newRaft(1, []int64{1, 2}, 0, 0)
+	sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
 	sm.Term = 1
 	sm.restore(s)
 
@@ -749,40 +739,26 @@ func TestBcastBeat(t *testing.T) {
 		sm.appendEntry(pb.Entry{})
 	}
 
-	tests := []struct {
-		pnext  int64
-		windex int64
-		wterm  int64
-		wto    int64
-	}{
-		{offset + 1, offset, 1, 2},
-		{offset + 2, offset + 1, 2, 2},
-		// pr.next -1 < offset
-		{offset, offset, 1, 2},
-		{offset - 1, offset, 1, 2},
+	sm.Step(pb.Message{Type: msgBeat})
+	msgs := sm.ReadMessages()
+	if len(msgs) != 2 {
+		t.Fatalf("len(msgs) = %v, want 1", len(msgs))
 	}
-
-	for i, tt := range tests {
-		sm.prs[2].match = 0
-		sm.prs[2].next = tt.pnext
-
-		sm.Step(pb.Message{Type: msgBeat})
-		msgs := sm.ReadMessages()
-		if len(msgs) != 1 {
-			t.Fatalf("#%d: len(msgs) = %v, want 1", i, len(msgs))
-		}
-		m := msgs[0]
+	tomap := map[int64]bool{2: true, 3: true}
+	for i, m := range msgs {
 		if m.Type != msgApp {
 			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp)
 		}
-		if m.Index != tt.windex {
-			t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, tt.windex)
+		if m.Index != 0 {
+			t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
 		}
-		if m.LogTerm != tt.wterm {
-			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, tt.wterm)
+		if m.LogTerm != 0 {
+			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
 		}
-		if m.To != tt.wto {
-			t.Fatalf("#%d: to = %d, want %d", i, m.To, tt.wto)
+		if !tomap[m.To] {
+			t.Fatalf("#%d: unexpected to %d", i, m.To)
+		} else {
+			delete(tomap, m.To)
 		}
 		if len(m.Entries) != 0 {
 			t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
@@ -877,26 +853,16 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	sm.Step(pb.Message{From: 1, To: 1, Type: msgBeat})
-	msgs := sm.ReadMessages()
-	if len(msgs) != 1 {
-		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
-	}
-	m := msgs[0]
-	if m.Type != msgApp {
-		t.Errorf("m.Type = %v, want %v", m.Type, msgApp)
-	}
-
 	// force set the next of node 1, so that
 	// node 1 needs a snapshot
 	sm.prs[2].next = sm.raftLog.offset
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Denied: true})
-	msgs = sm.ReadMessages()
+	msgs := sm.ReadMessages()
 	if len(msgs) != 1 {
 		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
 	}
-	m = msgs[0]
+	m := msgs[0]
 	if m.Type != msgSnap {
 		t.Errorf("m.Type = %v, want %v", m.Type, msgSnap)
 	}
@@ -931,17 +897,17 @@ func TestSlowNodeRestore(t *testing.T) {
 	lead.compact(nil)
 
 	nt.recover()
-	nt.send(pb.Message{From: 1, To: 1, Type: msgBeat})
-
+	// trigger a snapshot
+	nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}})
 	follower := nt.peers[3].(*raft)
 	if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
 		t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
 	}
 
-	committed := follower.raftLog.lastIndex()
+	// trigger a commit
 	nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}})
-	if follower.raftLog.committed != committed+1 {
-		t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, committed+1)
+	if follower.raftLog.committed != lead.raftLog.committed {
+		t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
 	}
 }