Browse Source

Merge pull request #1852 from xiang90/heartbeat

raft: add msgHeartbeat type
Xiang Li 11 years ago
parent
commit
4ebd3a0b10
5 changed files with 45 additions and 41 deletions
  1. 9 9
      raft/raft.go
  2. 2 2
      raft/raft_paper_test.go
  3. 6 6
      raft/raft_test.go
  4. 19 16
      raft/raftpb/raft.pb.go
  5. 9 8
      raft/raftpb/raft.proto

+ 9 - 9
raft/raft.go

@@ -262,7 +262,7 @@ func (r *raft) sendHeartbeat(to uint64) {
 	commit := min(r.prs[to].match, r.raftLog.committed)
 	m := pb.Message{
 		To:     to,
-		Type:   pb.MsgApp,
+		Type:   pb.MsgHeartbeat,
 		Commit: commit,
 	}
 	r.send(m)
@@ -501,9 +501,6 @@ func stepLeader(r *raft, m pb.Message) {
 		r.appendEntry(e)
 		r.bcastAppend()
 	case pb.MsgAppResp:
-		if m.Index == 0 {
-			return
-		}
 		if m.Reject {
 			log.Printf("raft: %x received msgApp rejection from %x for index %d",
 				r.id, m.From, m.Index)
@@ -530,6 +527,9 @@ func stepCandidate(r *raft, m pb.Message) {
 	case pb.MsgApp:
 		r.becomeFollower(r.Term, m.From)
 		r.handleAppendEntries(m)
+	case pb.MsgHeartbeat:
+		r.becomeFollower(r.Term, m.From)
+		r.handleHeartbeat(m)
 	case pb.MsgSnap:
 		r.becomeFollower(m.Term, m.From)
 		r.handleSnapshot(m)
@@ -561,11 +561,11 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgApp:
 		r.elapsed = 0
 		r.lead = m.From
-		if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
-			r.handleHeartbeat(m)
-		} else {
-			r.handleAppendEntries(m)
-		}
+		r.handleAppendEntries(m)
+	case pb.MsgHeartbeat:
+		r.elapsed = 0
+		r.lead = m.From
+		r.handleHeartbeat(m)
 	case pb.MsgSnap:
 		r.elapsed = 0
 		r.handleSnapshot(m)

+ 2 - 2
raft/raft_paper_test.go

@@ -127,8 +127,8 @@ func TestLeaderBcastBeat(t *testing.T) {
 	msgs := r.readMessages()
 	sort.Sort(messageSlice(msgs))
 	wmsgs := []pb.Message{
-		{From: 1, To: 2, Term: 1, Type: pb.MsgApp},
-		{From: 1, To: 3, Term: 1, Type: pb.MsgApp},
+		{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
+		{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
 	}
 	if !reflect.DeepEqual(msgs, wmsgs) {
 		t.Errorf("msgs = %v, want %v", msgs, wmsgs)

+ 6 - 6
raft/raft_test.go

@@ -997,8 +997,8 @@ func TestBcastBeat(t *testing.T) {
 		3: min(sm.raftLog.committed, sm.prs[3].match),
 	}
 	for i, m := range msgs {
-		if m.Type != pb.MsgApp {
-			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp)
+		if m.Type != pb.MsgHeartbeat {
+			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
 		}
 		if m.Index != 0 {
 			t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
@@ -1052,8 +1052,8 @@ func TestRecvMsgBeat(t *testing.T) {
 			t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
 		}
 		for _, m := range msgs {
-			if m.Type != pb.MsgApp {
-				t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgApp)
+			if m.Type != pb.MsgHeartbeat {
+				t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat)
 			}
 		}
 	}
@@ -1377,9 +1377,9 @@ func TestRaftNodes(t *testing.T) {
 }
 
 func ents(terms ...uint64) *raft {
-	ents := []pb.Entry{}
+	ents := []pb.Entry{{}}
 	for i, term := range terms {
-		ents = append(ents, pb.Entry{Index: uint64(i), Term: term})
+		ents = append(ents, pb.Entry{Index: uint64(i + 1), Term: term})
 	}
 
 	sm := &raft{

+ 19 - 16
raft/raftpb/raft.pb.go

@@ -69,14 +69,15 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
 type MessageType int32
 
 const (
-	MsgHup      MessageType = 0
-	MsgBeat     MessageType = 1
-	MsgProp     MessageType = 2
-	MsgApp      MessageType = 3
-	MsgAppResp  MessageType = 4
-	MsgVote     MessageType = 5
-	MsgVoteResp MessageType = 6
-	MsgSnap     MessageType = 7
+	MsgHup       MessageType = 0
+	MsgBeat      MessageType = 1
+	MsgProp      MessageType = 2
+	MsgApp       MessageType = 3
+	MsgAppResp   MessageType = 4
+	MsgVote      MessageType = 5
+	MsgVoteResp  MessageType = 6
+	MsgSnap      MessageType = 7
+	MsgHeartbeat MessageType = 8
 )
 
 var MessageType_name = map[int32]string{
@@ -88,16 +89,18 @@ var MessageType_name = map[int32]string{
 	5: "MsgVote",
 	6: "MsgVoteResp",
 	7: "MsgSnap",
+	8: "MsgHeartbeat",
 }
 var MessageType_value = map[string]int32{
-	"MsgHup":      0,
-	"MsgBeat":     1,
-	"MsgProp":     2,
-	"MsgApp":      3,
-	"MsgAppResp":  4,
-	"MsgVote":     5,
-	"MsgVoteResp": 6,
-	"MsgSnap":     7,
+	"MsgHup":       0,
+	"MsgBeat":      1,
+	"MsgProp":      2,
+	"MsgApp":       3,
+	"MsgAppResp":   4,
+	"MsgVote":      5,
+	"MsgVoteResp":  6,
+	"MsgSnap":      7,
+	"MsgHeartbeat": 8,
 }
 
 func (x MessageType) Enum() *MessageType {

+ 9 - 8
raft/raftpb/raft.proto

@@ -32,14 +32,15 @@ message Snapshot {
 }
 
 enum MessageType {
-	MsgHup      = 0;
-	MsgBeat     = 1;
-	MsgProp     = 2;
-	MsgApp      = 3;
-	MsgAppResp  = 4;
-	MsgVote     = 5;
-	MsgVoteResp = 6;
-	MsgSnap     = 7;
+	MsgHup       = 0;
+	MsgBeat      = 1;
+	MsgProp      = 2;
+	MsgApp       = 3;
+	MsgAppResp   = 4;
+	MsgVote      = 5;
+	MsgVoteResp  = 6;
+	MsgSnap      = 7;
+	MsgHeartbeat = 8;
 }
 
 message Message {