Prechádzať zdrojové kódy

Merge pull request #10279 from tbg/leader-progress-replicate

raft: ensure leader is in ProgressStateReplicate
Xiang Li 7 rokov pred
rodič
commit
f28945ba8a
2 zmenil súbory, kde vykonal 24 pridanie a 0 odobranie
  1. 5 0
      raft/raft.go
  2. 19 0
      raft/raft_test.go

+ 5 - 0
raft/raft.go

@@ -750,6 +750,11 @@ func (r *raft) becomeLeader() {
 	r.tick = r.tickHeartbeat
 	r.lead = r.id
 	r.state = StateLeader
+	// Followers enter replicate mode when they've been successfully probed
+	// (perhaps after having received a snapshot as a result). The leader is
+	// trivially in this state. Note that r.reset() has initialized this
+	// progress with the last index already.
+	r.prs[r.id].becomeReplicate()
 
 	// Conservatively set the pendingConfIndex to the last index in the
 	// log. There may or may not be a pending config change, but it's

+ 19 - 0
raft/raft_test.go

@@ -267,11 +267,30 @@ func TestProgressResume(t *testing.T) {
 	}
 }
 
+func TestProgressLeader(t *testing.T) {
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.prs[2].becomeReplicate()
+
+	// Send proposals to r1. The first 5 entries should be appended to the log.
+	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
+	for i := 0; i < 5; i++ {
+		if pr := r.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
+			t.Errorf("unexpected progress %v", pr)
+		}
+		if err := r.Step(propMsg); err != nil {
+			t.Fatalf("proposal resulted in error: %v", err)
+		}
+	}
+}
+
 // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response.
 func TestProgressResumeByHeartbeatResp(t *testing.T) {
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
+
 	r.prs[2].Paused = true
 
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})