Selaa lähdekoodia

etcdserver: clean up start and stop logic of raft

kill TODO and make it more readable.
Yicheng Qin 10 vuotta sitten
vanhempi
commit
7696dd3280
3 muutettua tiedostoa jossa 67 lisäystä ja 62 poistoa
  1. 64 49
      etcdserver/raft.go
  2. 1 5
      etcdserver/raft_test.go
  3. 2 8
      etcdserver/server.go

+ 64 - 49
etcdserver/raft.go

@@ -108,67 +108,77 @@ type raftNode struct {
 	done    chan struct{}
 }
 
-func (r *raftNode) run() {
-	var syncC <-chan time.Time
-
-	defer r.stop()
-	for {
-		select {
-		case <-r.ticker:
-			r.Tick()
-		case rd := <-r.Ready():
-			if rd.SoftState != nil {
-				atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
-				if rd.RaftState == raft.StateLeader {
-					syncC = r.s.SyncTicker
-					// TODO: remove the nil checking
-					// current test utility does not provide the stats
-					if r.s.stats != nil {
-						r.s.stats.BecomeLeader()
+// start prepares and starts raftNode in a new goroutine. It is no longer safe
+// to modify the fields after it has been started.
+// TODO: Ideally raftNode should get rid of the passed in server structure.
+func (r *raftNode) start(s *EtcdServer) {
+	r.s = s
+	r.applyc = make(chan apply)
+	r.stopped = make(chan struct{})
+	r.done = make(chan struct{})
+
+	go func() {
+		var syncC <-chan time.Time
+
+		defer r.onStop()
+		for {
+			select {
+			case <-r.ticker:
+				r.Tick()
+			case rd := <-r.Ready():
+				if rd.SoftState != nil {
+					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
+					if rd.RaftState == raft.StateLeader {
+						syncC = r.s.SyncTicker
+						// TODO: remove the nil checking
+						// current test utility does not provide the stats
+						if r.s.stats != nil {
+							r.s.stats.BecomeLeader()
+						}
+					} else {
+						syncC = nil
 					}
-				} else {
-					syncC = nil
 				}
-			}
 
-			apply := apply{
-				entries:  rd.CommittedEntries,
-				snapshot: rd.Snapshot,
-				done:     make(chan struct{}),
-			}
+				apply := apply{
+					entries:  rd.CommittedEntries,
+					snapshot: rd.Snapshot,
+					done:     make(chan struct{}),
+				}
 
-			select {
-			case r.applyc <- apply:
-			case <-r.stopped:
-				return
-			}
+				select {
+				case r.applyc <- apply:
+				case <-r.stopped:
+					return
+				}
 
-			if !raft.IsEmptySnap(rd.Snapshot) {
-				if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
-					plog.Fatalf("raft save snapshot error: %v", err)
+				if !raft.IsEmptySnap(rd.Snapshot) {
+					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
+						plog.Fatalf("raft save snapshot error: %v", err)
+					}
+					r.raftStorage.ApplySnapshot(rd.Snapshot)
+					plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
 				}
-				r.raftStorage.ApplySnapshot(rd.Snapshot)
-				plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
-			}
-			if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
-				plog.Fatalf("raft save state and entries error: %v", err)
-			}
-			r.raftStorage.Append(rd.Entries)
+				if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
+					plog.Fatalf("raft save state and entries error: %v", err)
+				}
+				r.raftStorage.Append(rd.Entries)
 
-			r.s.send(rd.Messages)
+				r.s.send(rd.Messages)
 
-			select {
-			case <-apply.done:
+				select {
+				case <-apply.done:
+				case <-r.stopped:
+					return
+				}
+				r.Advance()
+			case <-syncC:
+				r.s.sync(defaultSyncTimeout)
 			case <-r.stopped:
 				return
 			}
-			r.Advance()
-		case <-syncC:
-			r.s.sync(defaultSyncTimeout)
-		case <-r.stopped:
-			return
 		}
-	}
+	}()
 }
 
 func (r *raftNode) apply() chan apply {
@@ -176,6 +186,11 @@ func (r *raftNode) apply() chan apply {
 }
 
 func (r *raftNode) stop() {
+	r.stopped <- struct{}{}
+	<-r.done
+}
+
+func (r *raftNode) onStop() {
 	r.Stop()
 	r.transport.Stop()
 	if err := r.storage.Close(); err != nil {

+ 1 - 5
etcdserver/raft_test.go

@@ -148,15 +148,11 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 	n := newReadyNode()
 	r := raftNode{
 		Node:        n,
-		applyc:      make(chan apply),
 		storage:     &storageRecorder{},
 		raftStorage: raft.NewMemoryStorage(),
 		transport:   &nopTransporter{},
-		stopped:     make(chan struct{}),
-		done:        make(chan struct{}),
 	}
-	r.s = &EtcdServer{r: r}
-	go r.run()
+	r.start(&EtcdServer{r: r})
 	n.readyc <- raft.Ready{}
 	select {
 	case <-r.applyc:

+ 2 - 8
etcdserver/server.go

@@ -413,15 +413,9 @@ func (s *EtcdServer) run() {
 	confState := snap.Metadata.ConfState
 	snapi := snap.Metadata.Index
 	appliedi := snapi
-	// TODO: get rid of the raft initialization in etcd server
-	s.r.s = s
-	s.r.applyc = make(chan apply)
-	s.r.stopped = make(chan struct{})
-	s.r.done = make(chan struct{})
-	go s.r.run()
+	s.r.start(s)
 	defer func() {
-		s.r.stopped <- struct{}{}
-		<-s.r.done
+		s.r.stop()
 		close(s.done)
 	}()