Browse Source

etcdserver: rework update committed index logic

fanmin shi 9 years ago
parent
commit
2faf72f47c
2 changed files with 23 additions and 11 deletions
  1. 15 0
      etcdserver/raft.go
  2. 8 11
      etcdserver/server.go

+ 15 - 0
etcdserver/raft.go

@@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					raftDone: raftDone,
 				}
 
+				updateCommittedIndex(&ap, rh)
+
 				select {
 				case r.applyc <- ap:
 				case <-r.stopped:
@@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 	}()
 }
 
+func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
+	var ci uint64
+	if len(ap.entries) != 0 {
+		ci = ap.entries[len(ap.entries)-1].Index
+	}
+	if ap.snapshot.Metadata.Index > ci {
+		ci = ap.snapshot.Metadata.Index
+	}
+	if ci != 0 {
+		rh.updateCommittedIndex(ci)
+	}
+}
+
 func (r *raftNode) sendMessages(ms []raftpb.Message) {
 	sentAppResp := false
 	for i := len(ms) - 1; i >= 0; i-- {

+ 8 - 11
etcdserver/server.go

@@ -598,7 +598,8 @@ type etcdProgress struct {
 // and helps decouple state machine logic from Raft algorithms.
 // TODO: add a state machine interface to apply the commit entries and do snapshot/recover
 type raftReadyHandler struct {
-	leadershipUpdate func()
+	leadershipUpdate     func()
+	updateCommittedIndex func(uint64)
 }
 
 func (s *EtcdServer) run() {
@@ -648,6 +649,12 @@ func (s *EtcdServer) run() {
 				s.r.td.Reset()
 			}
 		},
+		updateCommittedIndex: func(ci uint64) {
+			cci := s.getCommittedIndex()
+			if ci > cci {
+				s.setCommittedIndex(ci)
+			}
+		},
 	}
 	s.r.start(rh)
 
@@ -701,16 +708,6 @@ func (s *EtcdServer) run() {
 	for {
 		select {
 		case ap := <-s.r.apply():
-			var ci uint64
-			if len(ap.entries) != 0 {
-				ci = ap.entries[len(ap.entries)-1].Index
-			}
-			if ap.snapshot.Metadata.Index > ci {
-				ci = ap.snapshot.Metadata.Index
-			}
-			if ci != 0 {
-				s.setCommittedIndex(ci)
-			}
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			sched.Schedule(f)
 		case leases := <-expiredLeaseC: