Browse Source

Merge pull request #6155 from gyuho/raft-leader-transfer

*: expose Raft leader transfer
Gyu-Ho Lee 9 years ago
parent
commit
de06dc1272
4 changed files with 25 additions and 10 deletions
  1. 5 4
      etcdserver/server_test.go
  2. 12 0
      raft/node.go
  3. 7 5
      raft/raft.go
  4. 1 1
      raft/util.go

+ 5 - 4
etcdserver/server_test.go

@@ -1369,10 +1369,11 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 	n.Record(testutil.Action{Name: "Step"})
 	return nil
 }
-func (n *nodeRecorder) Status() raft.Status                              { return raft.Status{} }
-func (n *nodeRecorder) Ready() <-chan raft.Ready                         { return nil }
-func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
-func (n *nodeRecorder) Advance()                                         {}
+func (n *nodeRecorder) Status() raft.Status                                             { return raft.Status{} }
+func (n *nodeRecorder) Ready() <-chan raft.Ready                                        { return nil }
+func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
+func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error                { return nil }
+func (n *nodeRecorder) Advance()                                                        {}
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
 	n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
 	return &raftpb.ConfState{}

+ 12 - 0
raft/node.go

@@ -144,6 +144,9 @@ type Node interface {
 	// to match MemoryStorage.Compact.
 	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
 
+	// TransferLeadership attempts to transfer leadership to the given transferee.
+	TransferLeadership(ctx context.Context, lead, transferee uint64)
+
 	// ReadIndex request a read state. The read state will be set in the ready.
 	// Read state has a read index. Once the application advances further than the read
 	// index, any linearizable read requests issued before the read request can be
@@ -485,6 +488,15 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
 	}
 }
 
+func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
+	select {
+	// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
+	case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
+	case <-n.done:
+	case <-ctx.Done():
+	}
+}
+
 func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
 	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
 }

+ 7 - 5
raft/raft.go

@@ -590,11 +590,6 @@ func (r *raft) Step(m pb.Message) error {
 		}
 		return nil
 	}
-	if m.Type == pb.MsgTransferLeader {
-		if r.state != StateLeader {
-			r.logger.Debugf("%x [term %d state %v] ignoring MsgTransferLeader to %x", r.id, r.Term, r.state, m.From)
-		}
-	}
 
 	switch {
 	case m.Term == 0:
@@ -874,6 +869,13 @@ func stepFollower(r *raft, m pb.Message) {
 				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 			r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 		}
+	case pb.MsgTransferLeader:
+		if r.lead == None {
+			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
+			return
+		}
+		m.To = r.lead
+		r.send(m)
 	case pb.MsgTimeoutNow:
 		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
 		r.campaign(campaignTransfer)

+ 1 - 1
raft/util.go

@@ -48,7 +48,7 @@ func max(a, b uint64) uint64 {
 
 func IsLocalMsg(msgt pb.MessageType) bool {
 	return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
-		msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum || msgt == pb.MsgTransferLeader
+		msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum
 }
 
 func IsResponseMsg(msgt pb.MessageType) bool {