浏览代码

Merge pull request #7810 from gyuho/sync-with-apply

etcdserver: ensure waitForApply sync with applyAll
Gyu-Ho Lee 8 年之前
父节点
当前提交
2af1605db3
共有 3 个文件被更改,包括 7 次插入17 次删除
  1. 4 1
      etcdserver/raft.go
  2. 3 12
      etcdserver/raft_test.go
  3. 0 4
      etcdserver/server.go

+ 4 - 1
etcdserver/raft.go

@@ -257,7 +257,10 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 						}
 					}
 					if waitApply {
-						rh.waitForApply()
+						// blocks until 'applyAll' calls 'applyWait.Trigger'
+						// to be in sync with scheduled config-change job
+						// (assume raftDone has cap of 1)
+						raftDone <- struct{}{}
 					}
 
 					// gofail: var raftBeforeFollowerSend struct{}

+ 3 - 12
etcdserver/raft_test.go

@@ -180,8 +180,6 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 func TestConfgChangeBlocksApply(t *testing.T) {
 	n := newNopReadyNode()
 
-	waitApplyc := make(chan struct{})
-
 	r := newRaftNode(raftNodeConfig{
 		Node:        n,
 		storage:     mockstorage.NewStorageRecorder(""),
@@ -190,21 +188,14 @@ func TestConfgChangeBlocksApply(t *testing.T) {
 	})
 	srv := &EtcdServer{r: *r}
 
-	rh := &raftReadyHandler{
-		updateLeadership: func(bool) {},
-		waitForApply: func() {
-			<-waitApplyc
-		},
-	}
-
-	srv.r.start(rh)
+	srv.r.start(&raftReadyHandler{updateLeadership: func(bool) {}})
 	defer srv.r.Stop()
 
 	n.readyc <- raft.Ready{
 		SoftState:        &raft.SoftState{RaftState: raft.StateFollower},
 		CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
 	}
-	<-srv.r.applyc
+	ap := <-srv.r.applyc
 
 	continueC := make(chan struct{})
 	go func() {
@@ -220,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) {
 	}
 
 	// finish apply, unblock raft routine
-	close(waitApplyc)
+	<-ap.raftDone
 
 	select {
 	case <-continueC:

+ 0 - 4
etcdserver/server.go

@@ -614,7 +614,6 @@ type etcdProgress struct {
 type raftReadyHandler struct {
 	updateLeadership     func(newLeader bool)
 	updateCommittedIndex func(uint64)
-	waitForApply         func()
 }
 
 func (s *EtcdServer) run() {
@@ -676,9 +675,6 @@ func (s *EtcdServer) run() {
 				s.setCommittedIndex(ci)
 			}
 		},
-		waitForApply: func() {
-			sched.WaitFinish(0)
-		},
 	}
 	s.r.start(rh)