浏览代码

Merge pull request #4916 from es-chow/transfer-leader

raft: transfer leader feature
Xiang Li 9 年之前
父节点
当前提交
b70e6a6bf1
共有 9 个文件被更改,包括 378 次插入38 次删除
  1. 2 2
      raft/node.go
  2. 1 1
      raft/node_test.go
  3. 73 2
      raft/raft.go
  4. 255 0
      raft/raft_test.go
  5. 32 26
      raft/raftpb/raft.pb.go
  6. 2 0
      raft/raftpb/raft.proto
  7. 7 2
      raft/rawnode.go
  8. 1 1
      raft/rawnode_test.go
  9. 5 4
      raft/util.go

+ 2 - 2
raft/node.go

@@ -306,7 +306,7 @@ func (n *node) run(r *raft) {
 			r.Step(m)
 		case m := <-n.recvc:
 			// filter out response message from unknown From.
-			if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m) {
+			if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) {
 				r.Step(m) // raft never returns an error
 			}
 		case cc := <-n.confc:
@@ -392,7 +392,7 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
 
 func (n *node) Step(ctx context.Context, m pb.Message) error {
 	// ignore unexpected local messages receiving over network
-	if IsLocalMsg(m) {
+	if IsLocalMsg(m.Type) {
 		// TODO: return an error?
 		return nil
 	}

+ 1 - 1
raft/node_test.go

@@ -42,7 +42,7 @@ func TestNodeStep(t *testing.T) {
 				t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
 			}
 		} else {
-			if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus || msgt == raftpb.MsgCheckQuorum {
+			if IsLocalMsg(msgt) {
 				select {
 				case <-n.recvc:
 					t.Errorf("%d: step should ignore %s", msgt, msgn)

+ 73 - 2
raft/raft.go

@@ -156,7 +156,9 @@ type raft struct {
 
 	// the leader id
 	lead uint64
-
+	// leadTransferee is id of the leader transfer target when its value is not zero.
+	// Follow the procedure defined in raft thesis 3.10.
+	leadTransferee uint64
 	// New configuration is ignored if there exists unapplied configuration.
 	pendingConf bool
 
@@ -397,6 +399,8 @@ func (r *raft) reset(term uint64) {
 	r.heartbeatElapsed = 0
 	r.resetRandomizedElectionTimeout()
 
+	r.leadTransferee = None
+
 	r.votes = make(map[uint64]bool)
 	for id := range r.prs {
 		r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
@@ -442,6 +446,10 @@ func (r *raft) tickHeartbeat() {
 		if r.checkQuorum {
 			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
 		}
+		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
+		if r.state == StateLeader && r.leadTransferee != None {
+			r.abortLeaderTransfer()
+		}
 	}
 
 	if r.state != StateLeader {
@@ -547,6 +555,11 @@ func (r *raft) Step(m pb.Message) error {
 		}
 		return nil
 	}
+	if m.Type == pb.MsgTransferLeader {
+		if r.state != StateLeader {
+			r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From)
+		}
+	}
 
 	switch {
 	case m.Term == 0:
@@ -594,6 +607,11 @@ func stepLeader(r *raft, m pb.Message) {
 			// drop any new proposals.
 			return
 		}
+		if r.leadTransferee != None {
+			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
+			return
+		}
+
 		for i, e := range m.Entries {
 			if e.Type == pb.EntryConfChange {
 				if r.pendingConf {
@@ -615,7 +633,7 @@ func stepLeader(r *raft, m pb.Message) {
 	// All other message types require a progress for m.From (pr).
 	pr, prOk := r.prs[m.From]
 	if !prOk {
-		r.logger.Debugf("no progress available for %x", m.From)
+		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
 		return
 	}
 	switch m.Type {
@@ -652,6 +670,11 @@ func stepLeader(r *raft, m pb.Message) {
 					// an update before, send it now.
 					r.sendAppend(m.From)
 				}
+				// Transfer leadership is in progress.
+				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
+					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
+					r.sendTimeoutNow(m.From)
+				}
 			}
 		}
 	case pb.MsgHeartbeatResp:
@@ -687,6 +710,37 @@ func stepLeader(r *raft, m pb.Message) {
 			pr.becomeProbe()
 		}
 		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
+	case pb.MsgTransferLeader:
+		leadTransferee := m.From
+		lastLeadTransferee := r.leadTransferee
+		if lastLeadTransferee != None {
+			if lastLeadTransferee == leadTransferee {
+				r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
+					r.id, r.Term, leadTransferee, leadTransferee)
+				return
+			}
+			r.abortLeaderTransfer()
+			r.logger.Infof("%x [term %d] abort transfer leadership to %x", r.id, r.Term, lastLeadTransferee)
+		}
+		if leadTransferee == r.id {
+			if lastLeadTransferee == None {
+				r.logger.Debugf("%x is already leader. Ignored transfer leadership to %x", r.id, r.id)
+			} else {
+				r.logger.Debugf("%x abort transfer leadership to %x, transfer to current leader %x.", r.id, lastLeadTransferee, r.id)
+			}
+			return
+		}
+		// Transfer leadership to third party.
+		r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
+		// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
+		r.electionElapsed = 0
+		r.leadTransferee = leadTransferee
+		if pr.Match == r.raftLog.lastIndex() {
+			r.sendTimeoutNow(leadTransferee)
+			r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
+		} else {
+			r.sendAppend(leadTransferee)
+		}
 	}
 }
 
@@ -718,6 +772,8 @@ func stepCandidate(r *raft, m pb.Message) {
 		case len(r.votes) - gr:
 			r.becomeFollower(r.Term, None)
 		}
+	case pb.MsgTimeoutNow:
+		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
 	}
 }
 
@@ -753,6 +809,9 @@ func stepFollower(r *raft, m pb.Message) {
 				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 			r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		}
+	case pb.MsgTimeoutNow:
+		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
+		r.campaign()
 	}
 }
 
@@ -846,6 +905,10 @@ func (r *raft) removeNode(id uint64) {
 	if r.maybeCommit() {
 		r.bcastAppend()
 	}
+	// If the removed node is the leadTransferee, then abort the leadership transfering.
+	if r.state == StateLeader && r.leadTransferee == id {
+		r.abortLeaderTransfer()
+	}
 }
 
 func (r *raft) resetPendingConf() { r.pendingConf = false }
@@ -900,3 +963,11 @@ func (r *raft) checkQuorumActive() bool {
 
 	return act >= r.quorum()
 }
+
+func (r *raft) sendTimeoutNow(to uint64) {
+	r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
+}
+
+func (r *raft) abortLeaderTransfer() {
+	r.leadTransferee = None
+}

+ 255 - 0
raft/raft_test.go

@@ -1911,6 +1911,261 @@ func TestCommitAfterRemoveNode(t *testing.T) {
 	}
 }
 
+// TestLeaderTransferToUpToDateNode verifies transfering should succeed
+// if the transferee has the most up-to-date log entires when transfer starts.
+func TestLeaderTransferToUpToDateNode(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	lead := nt.peers[1].(*raft)
+
+	if lead.lead != 1 {
+		t.Fatalf("after election leader is %x, want 1", lead.lead)
+	}
+
+	// Transfer leadership to 2.
+	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateFollower, 2)
+
+	// After some log replication, transfer leadership back to 1.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
+	nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+func TestLeaderTransferToSlowFollower(t *testing.T) {
+	defaultLogger.EnableDebug()
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
+	nt.recover()
+	lead := nt.peers[1].(*raft)
+	if lead.prs[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
+	}
+
+	// Transfer leadership to 3 when node 3 is lack of log.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateFollower, 3)
+}
+
+func TestLeaderTransferAfterSnapshot(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+	lead := nt.peers[1].(*raft)
+	nextEnts(lead, nt.storage[1])
+	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
+	nt.storage[1].Compact(lead.raftLog.applied)
+
+	nt.recover()
+	if lead.prs[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
+	}
+
+	// Transfer leadership to 3 when node 3 is lack of snapshot.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	// Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp})
+
+	checkLeaderTransferState(t, lead, StateFollower, 3)
+}
+
+func TestLeaderTransferToSelf(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	lead := nt.peers[1].(*raft)
+
+	// Transfer leadership to self, there will be noop.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+func TestLeaderTransferToNonExistingNode(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	lead := nt.peers[1].(*raft)
+	// Transfer leadership to non-existing node, there will be noop.
+	nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader})
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+func TestLeaderTransferTimeout(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	// Transfer leadership to isolated node, wait for timeout.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+	for i := 0; i < lead.heartbeatTimeout; i++ {
+		lead.tick()
+	}
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
+		lead.tick()
+	}
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+func TestLeaderTransferIgnoreProposal(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	// Transfer leadership to isolated node to let transfer pending, then send proposal.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
+	if lead.prs[1].Match != 1 {
+		t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)
+	}
+}
+
+func TestLeaderTransferReceiveHigherTermVote(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	// Transfer leadership to isolated node to let transfer pending.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2})
+
+	checkLeaderTransferState(t, lead, StateFollower, 2)
+}
+
+func TestLeaderTransferRemoveNode(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.ignore(pb.MsgTimeoutNow)
+
+	lead := nt.peers[1].(*raft)
+
+	// The leadTransferee is removed when leadship transfering.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	lead.removeNode(3)
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+// TestLeaderTransferBack verifies leadership can transfer back to self when last transfer is pending.
+func TestLeaderTransferBack(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	// Transfer leadership back to self.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+// TestLeaderTransferSecondTransferToAnotherNode verifies leader can transfer to another node
+// when last transfer is pending.
+func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	// Transfer leadership to another node.
+	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateFollower, 2)
+}
+
+// TestLeaderTransferSecondTransferToSameNode verifies second transfer leader request
+// to the same node should not extend the timeout while the first one is pending.
+func TestLeaderTransferSecondTransferToSameNode(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	nt.isolate(3)
+
+	lead := nt.peers[1].(*raft)
+
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+	if lead.leadTransferee != 3 {
+		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
+	}
+
+	for i := 0; i < lead.heartbeatTimeout; i++ {
+		lead.tick()
+	}
+	// Second transfer leadership request to the same node.
+	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
+
+	for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
+		lead.tick()
+	}
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
+func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) {
+	if r.state != state || r.lead != lead {
+		t.Fatalf("after transfering, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead)
+	}
+	if r.leadTransferee != None {
+		t.Fatalf("after transfering, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None)
+	}
+}
+
 func ents(terms ...uint64) *raft {
 	storage := NewMemoryStorage()
 	for i, term := range terms {

+ 32 - 26
raft/raftpb/raft.pb.go

@@ -70,19 +70,21 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
 type MessageType int32
 
 const (
-	MsgHup           MessageType = 0
-	MsgBeat          MessageType = 1
-	MsgProp          MessageType = 2
-	MsgApp           MessageType = 3
-	MsgAppResp       MessageType = 4
-	MsgVote          MessageType = 5
-	MsgVoteResp      MessageType = 6
-	MsgSnap          MessageType = 7
-	MsgHeartbeat     MessageType = 8
-	MsgHeartbeatResp MessageType = 9
-	MsgUnreachable   MessageType = 10
-	MsgSnapStatus    MessageType = 11
-	MsgCheckQuorum   MessageType = 12
+	MsgHup            MessageType = 0
+	MsgBeat           MessageType = 1
+	MsgProp           MessageType = 2
+	MsgApp            MessageType = 3
+	MsgAppResp        MessageType = 4
+	MsgVote           MessageType = 5
+	MsgVoteResp       MessageType = 6
+	MsgSnap           MessageType = 7
+	MsgHeartbeat      MessageType = 8
+	MsgHeartbeatResp  MessageType = 9
+	MsgUnreachable    MessageType = 10
+	MsgSnapStatus     MessageType = 11
+	MsgCheckQuorum    MessageType = 12
+	MsgTransferLeader MessageType = 13
+	MsgTimeoutNow     MessageType = 14
 )
 
 var MessageType_name = map[int32]string{
@@ -99,21 +101,25 @@ var MessageType_name = map[int32]string{
 	10: "MsgUnreachable",
 	11: "MsgSnapStatus",
 	12: "MsgCheckQuorum",
+	13: "MsgTransferLeader",
+	14: "MsgTimeoutNow",
 }
 var MessageType_value = map[string]int32{
-	"MsgHup":           0,
-	"MsgBeat":          1,
-	"MsgProp":          2,
-	"MsgApp":           3,
-	"MsgAppResp":       4,
-	"MsgVote":          5,
-	"MsgVoteResp":      6,
-	"MsgSnap":          7,
-	"MsgHeartbeat":     8,
-	"MsgHeartbeatResp": 9,
-	"MsgUnreachable":   10,
-	"MsgSnapStatus":    11,
-	"MsgCheckQuorum":   12,
+	"MsgHup":            0,
+	"MsgBeat":           1,
+	"MsgProp":           2,
+	"MsgApp":            3,
+	"MsgAppResp":        4,
+	"MsgVote":           5,
+	"MsgVoteResp":       6,
+	"MsgSnap":           7,
+	"MsgHeartbeat":      8,
+	"MsgHeartbeatResp":  9,
+	"MsgUnreachable":    10,
+	"MsgSnapStatus":     11,
+	"MsgCheckQuorum":    12,
+	"MsgTransferLeader": 13,
+	"MsgTimeoutNow":     14,
 }
 
 func (x MessageType) Enum() *MessageType {

+ 2 - 0
raft/raftpb/raft.proto

@@ -46,6 +46,8 @@ enum MessageType {
 	MsgUnreachable     = 10;
 	MsgSnapStatus      = 11;
 	MsgCheckQuorum     = 12;
+	MsgTransferLeader  = 13;
+	MsgTimeoutNow      = 14;
 }
 
 message Message {

+ 7 - 2
raft/rawnode.go

@@ -168,10 +168,10 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
 // Step advances the state machine using the given message.
 func (rn *RawNode) Step(m pb.Message) error {
 	// ignore unexpected local messages receiving over network
-	if IsLocalMsg(m) {
+	if IsLocalMsg(m.Type) {
 		return ErrStepLocalMsg
 	}
-	if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m) {
+	if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) {
 		return rn.raft.Step(m)
 	}
 	return ErrStepPeerNotFound
@@ -226,3 +226,8 @@ func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
 
 	_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
 }
+
+// TransferLeader tries to transfer leadership to the given transferee.
+func (rn *RawNode) TransferLeader(transferee uint64) {
+	_ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
+}

+ 1 - 1
raft/rawnode_test.go

@@ -33,7 +33,7 @@ func TestRawNodeStep(t *testing.T) {
 		msgt := raftpb.MessageType(i)
 		err = rawNode.Step(raftpb.Message{Type: msgt})
 		// LocalMsg should be ignored.
-		if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus {
+		if IsLocalMsg(msgt) {
 			if err != ErrStepLocalMsg {
 				t.Errorf("%d: step should ignore %s", msgt, msgn)
 			}

+ 5 - 4
raft/util.go

@@ -46,12 +46,13 @@ func max(a, b uint64) uint64 {
 	return b
 }
 
-func IsLocalMsg(m pb.Message) bool {
-	return m.Type == pb.MsgHup || m.Type == pb.MsgBeat || m.Type == pb.MsgUnreachable || m.Type == pb.MsgSnapStatus || m.Type == pb.MsgCheckQuorum
+func IsLocalMsg(msgt pb.MessageType) bool {
+	return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
+		msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader
 }
 
-func IsResponseMsg(m pb.Message) bool {
-	return m.Type == pb.MsgAppResp || m.Type == pb.MsgVoteResp || m.Type == pb.MsgHeartbeatResp || m.Type == pb.MsgUnreachable
+func IsResponseMsg(msgt pb.MessageType) bool {
+	return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable
 }
 
 // EntryFormatter can be implemented by the application to provide human-readable formatting