Procházet zdrojové kódy

Merge pull request #7497 from xiang90/fix_candidate

etcdserver: candidate should wait for applying all configuration changes
Xiang Li před 8 roky
rodič
revize
df839f3b7f
3 změnil soubory, kde provedl 69 přidání a 2 odebrání
  1. 12 0
      etcdserver/raft.go
  2. 50 0
      etcdserver/raft_test.go
  3. 7 2
      etcdserver/server.go

+ 12 - 0
etcdserver/raft.go

@@ -140,6 +140,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 	go func() {
 		defer r.onStop()
 		islead := false
+		isCandidate := false
 
 		for {
 			select {
@@ -162,6 +163,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					islead = rd.RaftState == raft.StateLeader
+					isCandidate = rd.RaftState == raft.StateCandidate
 					rh.updateLeadership()
 				}
 
@@ -225,7 +227,17 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					r.sendMessages(rd.Messages)
 				}
 				raftDone <- struct{}{}
+
 				r.Advance()
+
+				if isCandidate {
+					// candidate needs to wait for all pending configuration changes to be applied
+					// before continue. Or we might incorrectly count the number of votes (e.g. receive vote from
+					// a removed member).
+					// We simply wait for ALL pending entries to be applied for now.
+					// We might improve this later on if it causes unnecessary long blocking issues.
+					rh.waitForApply()
+				}
 			case <-r.stopped:
 				return
 			}

+ 50 - 0
etcdserver/raft_test.go

@@ -175,3 +175,53 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 		t.Fatalf("failed to stop raft loop")
 	}
 }
+
+func TestCandidateBlocksByApply(t *testing.T) {
+	n := newNopReadyNode()
+
+	waitApplyc := make(chan struct{})
+
+	srv := &EtcdServer{r: raftNode{
+		Node:        n,
+		storage:     mockstorage.NewStorageRecorder(""),
+		raftStorage: raft.NewMemoryStorage(),
+		transport:   rafthttp.NewNopTransporter(),
+		ticker:      &time.Ticker{},
+	}}
+
+	rh := &raftReadyHandler{
+		updateLeadership: func() {},
+		waitForApply: func() {
+			<-waitApplyc
+		},
+	}
+
+	srv.r.start(rh)
+	defer srv.r.Stop()
+
+	// become candidate
+	n.readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateCandidate}}
+	<-srv.r.applyc
+
+	continueC := make(chan struct{})
+	go func() {
+		n.readyc <- raft.Ready{}
+		<-srv.r.applyc
+		close(continueC)
+	}()
+
+	select {
+	case <-continueC:
+		t.Fatalf("unexpected execution: raft routine should block waiting for apply")
+	case <-time.After(time.Second):
+	}
+
+	// finish apply, unblock raft routine
+	close(waitApplyc)
+
+	select {
+	case <-continueC:
+	case <-time.After(time.Second):
+		t.Fatalf("unexpected blocking on execution")
+	}
+}

+ 7 - 2
etcdserver/server.go

@@ -608,6 +608,7 @@ type etcdProgress struct {
 type raftReadyHandler struct {
 	updateLeadership     func()
 	updateCommittedIndex func(uint64)
+	waitForApply         func()
 }
 
 func (s *EtcdServer) run() {
@@ -616,6 +617,9 @@ func (s *EtcdServer) run() {
 		plog.Panicf("get snapshot from raft storage error: %v", err)
 	}
 
+	// asynchronously accept apply packets, dispatch progress in-order
+	sched := schedule.NewFIFOScheduler()
+
 	var (
 		smu   sync.RWMutex
 		syncC <-chan time.Time
@@ -663,11 +667,12 @@ func (s *EtcdServer) run() {
 				s.setCommittedIndex(ci)
 			}
 		},
+		waitForApply: func() {
+			sched.WaitFinish(0)
+		},
 	}
 	s.r.start(rh)
 
-	// asynchronously accept apply packets, dispatch progress in-order
-	sched := schedule.NewFIFOScheduler()
 	ep := etcdProgress{
 		confState: sn.Metadata.ConfState,
 		snapi:     sn.Metadata.Index,