소스 검색

raft: make leader transferring workable when quorum check is on

swingbach@gmail.com 9 년 전
부모
커밋
e020b2a228
3개의 변경된 파일56개의 추가작업 그리고 7개의 파일을 삭제
  1. 19 5
      raft/raft.go
  2. 2 2
      raft/raft_paper_test.go
  3. 35 0
      raft/raft_test.go

+ 19 - 5
raft/raft.go

@@ -15,6 +15,7 @@
 package raft
 
 import (
+	"bytes"
 	"errors"
 	"fmt"
 	"math"
@@ -36,6 +37,17 @@ const (
 	StateLeader
 )
 
+// Possible values for CampaignType
+const (
+	LeaderElection CampaignType = "LeaderElection"
+	LeaderTransfer CampaignType = "LeaderTransfer"
+)
+
+// CampaignType represents the type of campaigning
+// the reason we use the type of string instead of uint64
+// is because it's simpler to compare and fill in raft entries
+type CampaignType string
+
 // StateType represents the role of a node in a cluster.
 type StateType uint64
 
@@ -520,7 +532,7 @@ func (r *raft) becomeLeader() {
 	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
 }
 
-func (r *raft) campaign() {
+func (r *raft) campaign(t CampaignType) {
 	r.becomeCandidate()
 	if r.quorum() == r.poll(r.id, true) {
 		r.becomeLeader()
@@ -532,7 +544,7 @@ func (r *raft) campaign() {
 		}
 		r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term)
-		r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
+		r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Entries: []pb.Entry{{Data: []byte(t)}}})
 	}
 }
 
@@ -557,7 +569,7 @@ func (r *raft) Step(m pb.Message) error {
 	if m.Type == pb.MsgHup {
 		if r.state != StateLeader {
 			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
-			r.campaign()
+			r.campaign(LeaderElection)
 		} else {
 			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
 		}
@@ -575,7 +587,9 @@ func (r *raft) Step(m pb.Message) error {
 	case m.Term > r.Term:
 		lead := m.From
 		if m.Type == pb.MsgVote {
-			if r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout {
+			force := len(m.Entries) == 1 && bytes.Equal(m.Entries[0].Data, []byte(LeaderTransfer))
+			inLease := r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout
+			if !force && inLease {
 				// If a server receives a RequestVote request within the minimum election timeout
 				// of hearing from a current leader, it does not update its term or grant its vote
 				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
@@ -842,7 +856,7 @@ func stepFollower(r *raft, m pb.Message) {
 		}
 	case pb.MsgTimeoutNow:
 		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
-		r.campaign()
+		r.campaign(LeaderTransfer)
 	case pb.MsgReadIndex:
 		if r.lead == None {
 			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)

+ 2 - 2
raft/raft_paper_test.go

@@ -175,8 +175,8 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
 	msgs := r.readMessages()
 	sort.Sort(messageSlice(msgs))
 	wmsgs := []pb.Message{
-		{From: 1, To: 2, Term: 2, Type: pb.MsgVote},
-		{From: 1, To: 3, Term: 2, Type: pb.MsgVote},
+		{From: 1, To: 2, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}},
+		{From: 1, To: 3, Term: 2, Type: pb.MsgVote, Entries: []pb.Entry{{Data: []byte(LeaderElection)}}},
 	}
 	if !reflect.DeepEqual(msgs, wmsgs) {
 		t.Errorf("msgs = %v, want %v", msgs, wmsgs)

+ 35 - 0
raft/raft_test.go

@@ -2208,6 +2208,41 @@ func TestLeaderTransferToUpToDateNode(t *testing.T) {
 	checkLeaderTransferState(t, lead, StateLeader, 1)
 }
 
+// TestLeaderTransferWithCheckQuorum ensures transferring leader still works
+// even the current leader is still under its leader lease
+func TestLeaderTransferWithCheckQuorum(t *testing.T) {
+	nt := newNetwork(nil, nil, nil)
+	for i := 1; i < 4; i++ {
+		r := nt.peers[uint64(i)].(*raft)
+		r.checkQuorum = true
+	}
+
+	f := nt.peers[2].(*raft)
+	for i := 0; i < f.electionTimeout; i++ {
+		f.tick()
+	}
+
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+
+	lead := nt.peers[1].(*raft)
+
+	if lead.lead != 1 {
+		t.Fatalf("after election leader is %x, want 1", lead.lead)
+	}
+
+	// Transfer leadership to 2.
+	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateFollower, 2)
+
+	// After some log replication, transfer leadership back to 1.
+	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
+	nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
+
+	checkLeaderTransferState(t, lead, StateLeader, 1)
+}
+
 func TestLeaderTransferToSlowFollower(t *testing.T) {
 	defaultLogger.EnableDebug()
 	nt := newNetwork(nil, nil, nil)