Browse Source

Merge pull request #5780 from xiang90/check_i

etcdserver: check index of the kv when restarting
Xiang Li 9 years ago
parent
commit
bbed3ecc8d
2 changed files with 29 additions and 6 deletions
  1. 19 5
      etcdserver/server.go
  2. 10 1
      mvcc/kvstore.go

+ 19 - 5
etcdserver/server.go

@@ -261,7 +261,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	if err != nil {
 		return nil, err
 	}
-	var remotes []*membership.Member
+	var (
+		remotes  []*membership.Member
+		snapshot *raftpb.Snapshot
+	)
+
 	switch {
 	case !haveWAL && !cfg.NewCluster:
 		if err = cfg.VerifyJoinExisting(); err != nil {
@@ -334,7 +338,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 		if cfg.ShouldDiscover() {
 			plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
 		}
-		var snapshot *raftpb.Snapshot
 		snapshot, err = ss.Load()
 		if err != nil && err != snap.ErrNoSnapshot {
 			return nil, err
@@ -402,6 +405,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	srv.be = be
 	srv.lessor = lease.NewLessor(srv.be)
 	srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
+	if beExist {
+		kvindex := srv.kv.ConsistentIndex()
+		if snapshot != nil && kvindex < snapshot.Metadata.Index {
+			return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
+		}
+	}
 	srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
 	srv.authStore = auth.NewAuthStore(srv.be)
 	if h := cfg.AutoCompactionRetention; h != 0 {
@@ -1033,6 +1042,13 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 
 // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
 func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
+	shouldApplyV3 := false
+	if e.Index > s.consistIndex.ConsistentIndex() {
+		// set the consistent index of current executing entry
+		s.consistIndex.setConsistentIndex(e.Index)
+		shouldApplyV3 = true
+	}
+
 	// raft state machine may generate noop entry when leader confirmation.
 	// skip it in advance to avoid some potential bug in the future
 	if len(e.Data) == 0 {
@@ -1057,7 +1073,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 	}
 
 	// do not re-apply applied entries.
-	if e.Index <= s.consistIndex.ConsistentIndex() {
+	if !shouldApplyV3 {
 		return
 	}
 
@@ -1066,8 +1082,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		id = raftReq.Header.ID
 	}
 
-	// set the consistent index of current executing entry
-	s.consistIndex.setConsistentIndex(e.Index)
 	ar := s.applyV3.Apply(&raftReq)
 	s.setAppliedIndex(e.Index)
 	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {

+ 10 - 1
mvcc/kvstore.go

@@ -325,7 +325,16 @@ func (s *store) Hash() (uint32, int64, error) {
 	return h, rev, err
 }
 
-func (s *store) Commit() { s.b.ForceCommit() }
+func (s *store) Commit() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	s.tx = s.b.BatchTx()
+	s.tx.Lock()
+	s.saveIndex()
+	s.tx.Unlock()
+	s.b.ForceCommit()
+}
 
 func (s *store) Restore(b backend.Backend) error {
 	s.mu.Lock()