Browse Source

Merge pull request #9067 from absolute8511/optimize-raft-drop

raft: let raft step return error when proposal is dropped to allow fail-fast
Xiang Li 8 years ago
parent
commit
c5532ebbf6
5 changed files with 51 additions and 31 deletions
  1. 6 3
      raft/node_test.go
  2. 35 25
      raft/raft.go
  3. 2 1
      raft/raft_paper_test.go
  4. 6 1
      raft/raft_test.go
  5. 2 1
      raft/rawnode_test.go

+ 6 - 3
raft/node_test.go

@@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) {
 // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
 func TestNodePropose(t *testing.T) {
 	msgs := []raftpb.Message{}
-	appendStep := func(r *raft, m raftpb.Message) {
+	appendStep := func(r *raft, m raftpb.Message) error {
 		msgs = append(msgs, m)
+		return nil
 	}
 
 	n := newNode()
@@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) {
 // It also ensures that ReadState can be read out through ready chan.
 func TestNodeReadIndex(t *testing.T) {
 	msgs := []raftpb.Message{}
-	appendStep := func(r *raft, m raftpb.Message) {
+	appendStep := func(r *raft, m raftpb.Message) error {
 		msgs = append(msgs, m)
+		return nil
 	}
 	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
 
@@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
 // to the underlying raft.
 func TestNodeProposeConfig(t *testing.T) {
 	msgs := []raftpb.Message{}
-	appendStep := func(r *raft, m raftpb.Message) {
+	appendStep := func(r *raft, m raftpb.Message) error {
 		msgs = append(msgs, m)
+		return nil
 	}
 
 	n := newNode()

+ 35 - 25
raft/raft.go

@@ -67,6 +67,10 @@ const (
 	campaignTransfer CampaignType = "CampaignTransfer"
 )
 
+// ErrProposalDropped is returned when the proposal is ignored by some cases,
+// so that the proposer can be notified and fail fast.
+var ErrProposalDropped = errors.New("raft proposal dropped")
+
 // lockedRand is a small wrapper around rand.Rand to provide
 // synchronization. Only the methods needed by the code are exposed
 // (e.g. Intn).
@@ -878,25 +882,28 @@ func (r *raft) Step(m pb.Message) error {
 		}
 
 	default:
-		r.step(r, m)
+		err := r.step(r, m)
+		if err != nil {
+			return err
+		}
 	}
 	return nil
 }
 
-type stepFunc func(r *raft, m pb.Message)
+type stepFunc func(r *raft, m pb.Message) error
 
-func stepLeader(r *raft, m pb.Message) {
+func stepLeader(r *raft, m pb.Message) error {
 	// These message types do not require any progress for m.From.
 	switch m.Type {
 	case pb.MsgBeat:
 		r.bcastHeartbeat()
-		return
+		return nil
 	case pb.MsgCheckQuorum:
 		if !r.checkQuorumActive() {
 			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
 			r.becomeFollower(r.Term, None)
 		}
-		return
+		return nil
 	case pb.MsgProp:
 		if len(m.Entries) == 0 {
 			r.logger.Panicf("%x stepped empty MsgProp", r.id)
@@ -905,11 +912,11 @@ func stepLeader(r *raft, m pb.Message) {
 			// If we are not currently a member of the range (i.e. this node
 			// was removed from the configuration while serving as leader),
 			// drop any new proposals.
-			return
+			return ErrProposalDropped
 		}
 		if r.leadTransferee != None {
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
-			return
+			return ErrProposalDropped
 		}
 
 		for i, e := range m.Entries {
@@ -925,12 +932,12 @@ func stepLeader(r *raft, m pb.Message) {
 		}
 		r.appendEntry(m.Entries...)
 		r.bcastAppend()
-		return
+		return nil
 	case pb.MsgReadIndex:
 		if r.quorum() > 1 {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 				// Reject read only request when this leader has not committed any log entry at its term.
-				return
+				return nil
 			}
 
 			// thinking: use an interally defined context instead of the user given context.
@@ -952,14 +959,14 @@ func stepLeader(r *raft, m pb.Message) {
 			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
 		}
 
-		return
+		return nil
 	}
 
 	// All other message types require a progress for m.From (pr).
 	pr := r.getProgress(m.From)
 	if pr == nil {
 		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
-		return
+		return nil
 	}
 	switch m.Type {
 	case pb.MsgAppResp:
@@ -1015,12 +1022,12 @@ func stepLeader(r *raft, m pb.Message) {
 		}
 
 		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
-			return
+			return nil
 		}
 
 		ackCount := r.readOnly.recvAck(m)
 		if ackCount < r.quorum() {
-			return
+			return nil
 		}
 
 		rss := r.readOnly.advance(m)
@@ -1034,7 +1041,7 @@ func stepLeader(r *raft, m pb.Message) {
 		}
 	case pb.MsgSnapStatus:
 		if pr.State != ProgressStateSnapshot {
-			return
+			return nil
 		}
 		if !m.Reject {
 			pr.becomeProbe()
@@ -1058,7 +1065,7 @@ func stepLeader(r *raft, m pb.Message) {
 	case pb.MsgTransferLeader:
 		if pr.IsLearner {
 			r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
-			return
+			return nil
 		}
 		leadTransferee := m.From
 		lastLeadTransferee := r.leadTransferee
@@ -1066,14 +1073,14 @@ func stepLeader(r *raft, m pb.Message) {
 			if lastLeadTransferee == leadTransferee {
 				r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
 					r.id, r.Term, leadTransferee, leadTransferee)
-				return
+				return nil
 			}
 			r.abortLeaderTransfer()
 			r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
 		}
 		if leadTransferee == r.id {
 			r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
-			return
+			return nil
 		}
 		// Transfer leadership to third party.
 		r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
@@ -1087,11 +1094,12 @@ func stepLeader(r *raft, m pb.Message) {
 			r.sendAppend(leadTransferee)
 		}
 	}
+	return nil
 }
 
 // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
 // whether they respond to MsgVoteResp or MsgPreVoteResp.
-func stepCandidate(r *raft, m pb.Message) {
+func stepCandidate(r *raft, m pb.Message) error {
 	// Only handle vote responses corresponding to our candidacy (while in
 	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
 	// our pre-candidate state).
@@ -1104,7 +1112,7 @@ func stepCandidate(r *raft, m pb.Message) {
 	switch m.Type {
 	case pb.MsgProp:
 		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
-		return
+		return ErrProposalDropped
 	case pb.MsgApp:
 		r.becomeFollower(r.Term, m.From)
 		r.handleAppendEntries(m)
@@ -1131,17 +1139,18 @@ func stepCandidate(r *raft, m pb.Message) {
 	case pb.MsgTimeoutNow:
 		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
 	}
+	return nil
 }
 
-func stepFollower(r *raft, m pb.Message) {
+func stepFollower(r *raft, m pb.Message) error {
 	switch m.Type {
 	case pb.MsgProp:
 		if r.lead == None {
 			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
-			return
+			return ErrProposalDropped
 		} else if r.disableProposalForwarding {
 			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
-			return
+			return ErrProposalDropped
 		}
 		m.To = r.lead
 		r.send(m)
@@ -1160,7 +1169,7 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgTransferLeader:
 		if r.lead == None {
 			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
-			return
+			return nil
 		}
 		m.To = r.lead
 		r.send(m)
@@ -1177,17 +1186,18 @@ func stepFollower(r *raft, m pb.Message) {
 	case pb.MsgReadIndex:
 		if r.lead == None {
 			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
-			return
+			return nil
 		}
 		m.To = r.lead
 		r.send(m)
 	case pb.MsgReadIndexResp:
 		if len(m.Entries) != 1 {
 			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
-			return
+			return nil
 		}
 		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
 	}
+	return nil
 }
 
 func (r *raft) handleAppendEntries(m pb.Message) {

+ 2 - 1
raft/raft_paper_test.go

@@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) {
 // Reference: section 5.1
 func TestRejectStaleTermMessage(t *testing.T) {
 	called := false
-	fakeStep := func(r *raft, m pb.Message) {
+	fakeStep := func(r *raft, m pb.Message) error {
 		called = true
+		return nil
 	}
 	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	r.step = fakeStep

+ 6 - 1
raft/raft_test.go

@@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) {
 // actual stepX function.
 func TestStepIgnoreOldTermMsg(t *testing.T) {
 	called := false
-	fakeStep := func(r *raft, m pb.Message) {
+	fakeStep := func(r *raft, m pb.Message) error {
 		called = true
+		return nil
 	}
 	sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	sm.step = fakeStep
@@ -3227,6 +3228,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
 	}
 
 	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+	err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+	if err != ErrProposalDropped {
+		t.Fatalf("should return drop proposal error while transferring")
+	}
 
 	if lead.prs[1].Match != 1 {
 		t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)

+ 2 - 1
raft/rawnode_test.go

@@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
 // to the underlying raft. It also ensures that ReadState can be read out.
 func TestRawNodeReadIndex(t *testing.T) {
 	msgs := []raftpb.Message{}
-	appendStep := func(r *raft, m raftpb.Message) {
+	appendStep := func(r *raft, m raftpb.Message) error {
 		msgs = append(msgs, m)
+		return nil
 	}
 	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}