Browse Source

etcdserver: remove possibly compacted entry look-up

Fix https://github.com/coreos/etcd/issues/7470.

This patch removes unnecessary term look-up in
'createMergedSnapshotMessage', which can trigger panic
if raft entry at etcdProgress.appliedi got compacted
by subsequent 'MsgSnap' messages--if a follower is
being (in this case, network latency spikes), it
could receive subsequent 'MsgSnap' requests from leader.

etcd server-side 'applyAll' routine and raft's Ready
processing routine becomes asynchronous after raft
entries are persisted. And given that raft Ready routine
takes less time to finish, it is possible that second
'MsgSnap' is being handled, while the slow 'applyAll'
is still processing the first(old) 'MsgSnap'. Then raft
Ready routine can compact the log entries at future
index to 'applyAll'. That is how 'createMergedSnapshotMessage'
tried to look up raft term with outdated etcdProgress.appliedi.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
80c10e150f
3 changed files with 12 additions and 16 deletions
  1. 10 8
      etcdserver/server.go
  2. 1 1
      etcdserver/server_test.go
  3. 1 7
      etcdserver/snapshot_merge.go

+ 10 - 8
etcdserver/server.go

@@ -599,6 +599,7 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
 type etcdProgress struct {
 type etcdProgress struct {
 	confState raftpb.ConfState
 	confState raftpb.ConfState
 	snapi     uint64
 	snapi     uint64
+	appliedt  uint64
 	appliedi  uint64
 	appliedi  uint64
 }
 }
 
 
@@ -676,6 +677,7 @@ func (s *EtcdServer) run() {
 	ep := etcdProgress{
 	ep := etcdProgress{
 		confState: sn.Metadata.ConfState,
 		confState: sn.Metadata.ConfState,
 		snapi:     sn.Metadata.Index,
 		snapi:     sn.Metadata.Index,
+		appliedt:  sn.Metadata.Term,
 		appliedi:  sn.Metadata.Index,
 		appliedi:  sn.Metadata.Index,
 	}
 	}
 
 
@@ -777,7 +779,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
 	select {
 	select {
 	// snapshot requested via send()
 	// snapshot requested via send()
 	case m := <-s.r.msgSnapC:
 	case m := <-s.r.msgSnapC:
-		merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
+		merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
 		s.sendMergedSnap(merged)
 		s.sendMergedSnap(merged)
 	default:
 	default:
 	}
 	}
@@ -879,6 +881,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	}
 	}
 	plog.Info("finished adding peers from new cluster configuration into network...")
 	plog.Info("finished adding peers from new cluster configuration into network...")
 
 
+	ep.appliedt = apply.snapshot.Metadata.Term
 	ep.appliedi = apply.snapshot.Metadata.Index
 	ep.appliedi = apply.snapshot.Metadata.Index
 	ep.snapi = ep.appliedi
 	ep.snapi = ep.appliedi
 	ep.confState = apply.snapshot.Metadata.ConfState
 	ep.confState = apply.snapshot.Metadata.ConfState
@@ -900,7 +903,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
 		return
 		return
 	}
 	}
 	var shouldstop bool
 	var shouldstop bool
-	if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
+	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
 		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
 		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
 	}
 	}
 }
 }
@@ -1254,9 +1257,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
 // apply takes entries received from Raft (after it has been committed) and
 // apply takes entries received from Raft (after it has been committed) and
 // applies them to the current state of the EtcdServer.
 // applies them to the current state of the EtcdServer.
 // The given entries should not be empty.
 // The given entries should not be empty.
-func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint64, bool) {
-	var applied uint64
-	var shouldstop bool
+func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
 	for i := range es {
 	for i := range es {
 		e := es[i]
 		e := es[i]
 		switch e.Type {
 		switch e.Type {
@@ -1266,16 +1267,17 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			pbutil.MustUnmarshal(&cc, e.Data)
 			removedSelf, err := s.applyConfChange(cc, confState)
 			removedSelf, err := s.applyConfChange(cc, confState)
-			shouldstop = shouldstop || removedSelf
+			shouldStop = shouldStop || removedSelf
 			s.w.Trigger(cc.ID, err)
 			s.w.Trigger(cc.ID, err)
 		default:
 		default:
 			plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
 			plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
 		}
 		}
 		atomic.StoreUint64(&s.r.index, e.Index)
 		atomic.StoreUint64(&s.r.index, e.Index)
 		atomic.StoreUint64(&s.r.term, e.Term)
 		atomic.StoreUint64(&s.r.term, e.Term)
-		applied = e.Index
+		appliedt = e.Term
+		appliedi = e.Index
 	}
 	}
-	return applied, shouldstop
+	return appliedt, appliedi, shouldStop
 }
 }
 
 
 // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
 // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer

+ 1 - 1
etcdserver/server_test.go

@@ -615,7 +615,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
 		ents = append(ents, ent)
 		ents = append(ents, ent)
 	}
 	}
 
 
-	_, shouldStop := srv.apply(ents, &raftpb.ConfState{})
+	_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
 	if !shouldStop {
 	if !shouldStop {
 		t.Errorf("shouldStop = %t, want %t", shouldStop, true)
 		t.Errorf("shouldStop = %t, want %t", shouldStop, true)
 	}
 	}

+ 1 - 7
etcdserver/snapshot_merge.go

@@ -16,7 +16,6 @@ package etcdserver
 
 
 import (
 import (
 	"io"
 	"io"
-	"log"
 
 
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
@@ -26,12 +25,7 @@ import (
 // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
 // createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
 // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
 // a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
 // as ReadCloser.
 // as ReadCloser.
-func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
-	snapt, err := s.r.raftStorage.Term(snapi)
-	if err != nil {
-		log.Panicf("get term should never fail: %v", err)
-	}
-
+func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
 	// get a snapshot of v2 store as []byte
 	// get a snapshot of v2 store as []byte
 	clone := s.store.Clone()
 	clone := s.store.Clone()
 	d, err := clone.SaveNoCopy()
 	d, err := clone.SaveNoCopy()