|
|
@@ -261,7 +261,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- var remotes []*membership.Member
|
|
|
+ var (
|
|
|
+ remotes []*membership.Member
|
|
|
+ snapshot *raftpb.Snapshot
|
|
|
+ )
|
|
|
+
|
|
|
switch {
|
|
|
case !haveWAL && !cfg.NewCluster:
|
|
|
if err = cfg.VerifyJoinExisting(); err != nil {
|
|
|
@@ -334,7 +338,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|
|
if cfg.ShouldDiscover() {
|
|
|
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
|
|
}
|
|
|
- var snapshot *raftpb.Snapshot
|
|
|
snapshot, err = ss.Load()
|
|
|
if err != nil && err != snap.ErrNoSnapshot {
|
|
|
return nil, err
|
|
|
@@ -402,6 +405,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|
|
srv.be = be
|
|
|
srv.lessor = lease.NewLessor(srv.be)
|
|
|
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
|
|
|
+ if beExist {
|
|
|
+ kvindex := srv.kv.ConsistentIndex()
|
|
|
+ if snapshot != nil && kvindex < snapshot.Metadata.Index {
|
|
|
+ return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
|
|
|
+ }
|
|
|
+ }
|
|
|
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
|
|
|
srv.authStore = auth.NewAuthStore(srv.be)
|
|
|
if h := cfg.AutoCompactionRetention; h != 0 {
|
|
|
@@ -1033,6 +1042,13 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
|
|
|
|
|
|
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
|
|
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
+ shouldApplyV3 := false
|
|
|
+ if e.Index > s.consistIndex.ConsistentIndex() {
|
|
|
+ // set the consistent index of current executing entry
|
|
|
+ s.consistIndex.setConsistentIndex(e.Index)
|
|
|
+ shouldApplyV3 = true
|
|
|
+ }
|
|
|
+
|
|
|
// raft state machine may generate noop entry when leader confirmation.
|
|
|
// skip it in advance to avoid some potential bug in the future
|
|
|
if len(e.Data) == 0 {
|
|
|
@@ -1057,7 +1073,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
}
|
|
|
|
|
|
// do not re-apply applied entries.
|
|
|
- if e.Index <= s.consistIndex.ConsistentIndex() {
|
|
|
+ if !shouldApplyV3 {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -1066,8 +1082,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
|
|
id = raftReq.Header.ID
|
|
|
}
|
|
|
|
|
|
- // set the consistent index of current executing entry
|
|
|
- s.consistIndex.setConsistentIndex(e.Index)
|
|
|
ar := s.applyV3.Apply(&raftReq)
|
|
|
s.setAppliedIndex(e.Index)
|
|
|
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
|