Browse Source

Merge pull request #1891 from yichengq/257

etcdserver: init state before run loop correctly
Yicheng Qin 11 years ago
parent
commit
9c8f5c9535
2 changed files with 25 additions and 19 deletions
  1. 22 18
      etcdserver/server.go
  2. 3 1
      etcdserver/server_test.go

+ 22 - 18
etcdserver/server.go

@@ -384,12 +384,19 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 
 func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
-	// snapi indicates the index of the last submitted snapshot request
-	var snapi, appliedi uint64
-	var nodes []uint64
 	var shouldstop bool
 	shouldstopC := s.sendhub.ShouldStopNotify()
 
+	// load initial state from raft storage
+	snap, err := s.raftStorage.Snapshot()
+	if err != nil {
+		log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
+	}
+	// snapi indicates the index of the last submitted snapshot request
+	snapi := snap.Metadata.Index
+	appliedi := snap.Metadata.Index
+	nodes := snap.Metadata.ConfState.Nodes
+
 	defer func() {
 		s.node.Stop()
 		s.sendhub.Stop()
@@ -432,24 +439,18 @@ func (s *EtcdServer) run() {
 
 			// recover from snapshot if it is more updated than current applied
 			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
-				{
-					if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
-						log.Panicf("recovery store error: %v", err)
-					}
-					s.Cluster.Recover()
-					appliedi = rd.Snapshot.Metadata.Index
-					log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
+				if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
+					log.Panicf("recovery store error: %v", err)
 				}
+				s.Cluster.Recover()
+				appliedi = rd.Snapshot.Metadata.Index
+				log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
 			}
 			// TODO(bmizerany): do this in the background, but take
 			// care to apply entries in a single goroutine, and not
 			// race them.
 			if len(rd.CommittedEntries) != 0 {
 				firsti := rd.CommittedEntries[0].Index
-				if appliedi == 0 {
-					appliedi = firsti - 1
-					snapi = appliedi
-				}
 				if firsti > appliedi+1 {
 					log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
 				}
@@ -457,8 +458,10 @@ func (s *EtcdServer) run() {
 				if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
 					ents = rd.CommittedEntries[appliedi+1-firsti:]
 				}
-				if appliedi, shouldstop = s.apply(ents); shouldstop {
-					return
+				if len(ents) > 0 {
+					if appliedi, shouldstop = s.apply(ents); shouldstop {
+						return
+					}
 				}
 			}
 
@@ -695,8 +698,9 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 }
 
-// apply takes an Entry received from Raft (after it has been committed) and
-// applies it to the current state of the EtcdServer
+// apply takes entries received from Raft (after it has been committed) and
+// applies them to the current state of the EtcdServer.
+// The given entries should not be empty.
 func (s *EtcdServer) apply(es []raftpb.Entry) (uint64, bool) {
 	var applied uint64
 	for i := range es {

+ 3 - 1
etcdserver/server_test.go

@@ -1621,6 +1621,7 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
 type nodeConfChangeCommitterRecorder struct {
 	nodeRecorder
 	readyc chan raft.Ready
+	index  uint64
 }
 
 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
@@ -1632,7 +1633,8 @@ func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context,
 	if err != nil {
 		return err
 	}
-	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange, Data: data}}}
+	n.index++
+	n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: raftpb.EntryConfChange, Data: data}}}
 	n.record(action{name: "ProposeConfChange:" + conf.Type.String()})
 	return nil
 }