Browse Source

raft: introduce MsgHeartbeatResp.

Now that heartbeats are distinct from MsgApp{,Resp}, the retries
currently performed in stepLeader's MsgAppResp section are only
performed on an actual MsgAppResp (or a new MsgProp). This means
that it may take a long time to recover from a dropped MsgAppResp
in a quiet cluster.

This commit adds a dedicated heartbeat response message. This message
does not convey the follower's current log position because the
MsgHeartbeat does not include the leaders term and index. Upon receipt
of a heartbeat response, the leader may retry the latest MsgApp if it
believes the follower to be behind.
Ben Darnell 11 years ago
parent
commit
2e1c36cdd9
5 changed files with 101 additions and 30 deletions
  1. 5 0
      raft/raft.go
  2. 62 2
      raft/raft_test.go
  3. 21 18
      raft/raftpb/raft.pb.go
  4. 10 9
      raft/raftpb/raft.proto
  5. 3 1
      raft/util.go

+ 5 - 0
raft/raft.go

@@ -494,6 +494,10 @@ func stepLeader(r *raft, m pb.Message) {
 				r.bcastAppend()
 			}
 		}
+	case pb.MsgHeartbeatResp:
+		if r.prs[m.From].match < r.raftLog.lastIndex() {
+			r.sendAppend(m.From)
+		}
 	case pb.MsgVote:
 		log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
@@ -579,6 +583,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
 
 func (r *raft) handleHeartbeat(m pb.Message) {
 	r.raftLog.commitTo(m.Commit)
+	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
 }
 
 func (r *raft) handleSnapshot(m pb.Message) {

+ 62 - 2
raft/raft_test.go

@@ -803,9 +803,69 @@ func TestHandleHeartbeat(t *testing.T) {
 			t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
 		}
 		m := sm.readMessages()
-		if len(m) != 0 {
-			t.Fatalf("#%d: msg = nil, want 0", i)
+		if len(m) != 1 {
+			t.Fatalf("#%d: msg = nil, want 1", i)
 		}
+		if m[0].Type != pb.MsgHeartbeatResp {
+			t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type)
+		}
+	}
+}
+
+// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response.
+func TestHandleHeartbeatResp(t *testing.T) {
+	storage := NewMemoryStorage()
+	storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
+	sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
+	sm.becomeCandidate()
+	sm.becomeLeader()
+	sm.raftLog.commitTo(sm.raftLog.lastIndex())
+
+	// A heartbeat response from a node that is behind; re-send MsgApp
+	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
+	msgs := sm.readMessages()
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
+	}
+	if msgs[0].Type != pb.MsgApp {
+		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
+	}
+
+	// A second heartbeat response with no AppResp does not re-send because we are in the wait state.
+	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
+	msgs = sm.readMessages()
+	if len(msgs) != 0 {
+		t.Fatalf("len(msgs) = %d, want 0", len(msgs))
+	}
+
+	// Send a heartbeat to reset the wait state; next heartbeat will re-send MsgApp.
+	sm.bcastHeartbeat()
+	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
+	msgs = sm.readMessages()
+	if len(msgs) != 2 {
+		t.Fatalf("len(msgs) = %d, want 2", len(msgs))
+	}
+	if msgs[0].Type != pb.MsgHeartbeat {
+		t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
+	}
+	if msgs[1].Type != pb.MsgApp {
+		t.Errorf("type = %v, want MsgApp", msgs[1].Type)
+	}
+
+	// Once we have an MsgAppResp, heartbeats no longer send MsgApp.
+	sm.Step(pb.Message{
+		From:  2,
+		Type:  pb.MsgAppResp,
+		Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
+	})
+	sm.bcastHeartbeat() // reset wait state
+	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
+	msgs = sm.readMessages()
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
+	}
+	if msgs[0].Type != pb.MsgHeartbeat {
+		t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
 	}
 }
 

+ 21 - 18
raft/raftpb/raft.pb.go

@@ -69,15 +69,16 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
 type MessageType int32
 
 const (
-	MsgHup       MessageType = 0
-	MsgBeat      MessageType = 1
-	MsgProp      MessageType = 2
-	MsgApp       MessageType = 3
-	MsgAppResp   MessageType = 4
-	MsgVote      MessageType = 5
-	MsgVoteResp  MessageType = 6
-	MsgSnap      MessageType = 7
-	MsgHeartbeat MessageType = 8
+	MsgHup           MessageType = 0
+	MsgBeat          MessageType = 1
+	MsgProp          MessageType = 2
+	MsgApp           MessageType = 3
+	MsgAppResp       MessageType = 4
+	MsgVote          MessageType = 5
+	MsgVoteResp      MessageType = 6
+	MsgSnap          MessageType = 7
+	MsgHeartbeat     MessageType = 8
+	MsgHeartbeatResp MessageType = 9
 )
 
 var MessageType_name = map[int32]string{
@@ -90,17 +91,19 @@ var MessageType_name = map[int32]string{
 	6: "MsgVoteResp",
 	7: "MsgSnap",
 	8: "MsgHeartbeat",
+	9: "MsgHeartbeatResp",
 }
 var MessageType_value = map[string]int32{
-	"MsgHup":       0,
-	"MsgBeat":      1,
-	"MsgProp":      2,
-	"MsgApp":       3,
-	"MsgAppResp":   4,
-	"MsgVote":      5,
-	"MsgVoteResp":  6,
-	"MsgSnap":      7,
-	"MsgHeartbeat": 8,
+	"MsgHup":           0,
+	"MsgBeat":          1,
+	"MsgProp":          2,
+	"MsgApp":           3,
+	"MsgAppResp":       4,
+	"MsgVote":          5,
+	"MsgVoteResp":      6,
+	"MsgSnap":          7,
+	"MsgHeartbeat":     8,
+	"MsgHeartbeatResp": 9,
 }
 
 func (x MessageType) Enum() *MessageType {

+ 10 - 9
raft/raftpb/raft.proto

@@ -32,15 +32,16 @@ message Snapshot {
 }
 
 enum MessageType {
-	MsgHup       = 0;
-	MsgBeat      = 1;
-	MsgProp      = 2;
-	MsgApp       = 3;
-	MsgAppResp   = 4;
-	MsgVote      = 5;
-	MsgVoteResp  = 6;
-	MsgSnap      = 7;
-	MsgHeartbeat = 8;
+	MsgHup           = 0;
+	MsgBeat          = 1;
+	MsgProp          = 2;
+	MsgApp           = 3;
+	MsgAppResp       = 4;
+	MsgVote          = 5;
+	MsgVoteResp      = 6;
+	MsgSnap          = 7;
+	MsgHeartbeat     = 8;
+	MsgHeartbeatResp = 9;
 }
 
 message Message {

+ 3 - 1
raft/util.go

@@ -50,7 +50,9 @@ func max(a, b uint64) uint64 {
 
 func IsLocalMsg(m pb.Message) bool { return m.Type == pb.MsgHup || m.Type == pb.MsgBeat }
 
-func IsResponseMsg(m pb.Message) bool { return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp }
+func IsResponseMsg(m pb.Message) bool {
+	return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp
+}
 
 // DescribeMessage returns a concise human-readable description of a
 // Message for debugging.