Browse Source

Merge pull request #1719 from yichengq/228

etcdserver: recover snapshot before applying committed entries
Yicheng Qin 11 years ago
parent
commit
b6887e4a83
7 changed files with 143 additions and 19 deletions
  1. 4 0
      etcdserver/cluster.go
  2. 21 11
      etcdserver/server.go
  3. 48 1
      etcdserver/server_test.go
  4. 5 4
      raft/log.go
  5. 31 3
      raft/log_test.go
  6. 1 0
      raft/node.go
  7. 33 0
      raft/node_test.go

+ 4 - 0
etcdserver/cluster.go

@@ -267,6 +267,10 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
 
 
 func (c *Cluster) SetStore(st store.Store) { c.store = st }
 func (c *Cluster) SetStore(st store.Store) { c.store = st }
 
 
+func (c *Cluster) Recover() {
+	c.members, c.removed = membersFromStore(c.store)
+}
+
 // ValidateConfigurationChange takes a proposed ConfChange and
 // ValidateConfigurationChange takes a proposed ConfChange and
 // ensures that it is still valid.
 // ensures that it is still valid.
 func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {

+ 21 - 11
etcdserver/server.go

@@ -349,27 +349,37 @@ func (s *EtcdServer) run() {
 			}
 			}
 			s.sender.Send(rd.Messages)
 			s.sender.Send(rd.Messages)
 
 
-			// TODO(bmizerany): do this in the background, but take
-			// care to apply entries in a single goroutine, and not
-			// race them.
-			if len(rd.CommittedEntries) != 0 {
-				appliedi = s.apply(rd.CommittedEntries)
-			}
-
-			if rd.Snapshot.Index > snapi {
-				snapi = rd.Snapshot.Index
-			}
-
 			// recover from snapshot if it is more updated than current applied
 			// recover from snapshot if it is more updated than current applied
 			if rd.Snapshot.Index > appliedi {
 			if rd.Snapshot.Index > appliedi {
 				if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
 				if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
 					log.Panicf("recovery store error: %v", err)
 					log.Panicf("recovery store error: %v", err)
 				}
 				}
+				s.Cluster.Recover()
 				appliedi = rd.Snapshot.Index
 				appliedi = rd.Snapshot.Index
 			}
 			}
+			// TODO(bmizerany): do this in the background, but take
+			// care to apply entries in a single goroutine, and not
+			// race them.
+			if len(rd.CommittedEntries) != 0 {
+				firsti := rd.CommittedEntries[0].Index
+				if appliedi == 0 {
+					appliedi = firsti - 1
+				}
+				if firsti > appliedi+1 {
+					log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
+				}
+				var ents []raftpb.Entry
+				if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
+					ents = rd.CommittedEntries[appliedi+1-firsti:]
+				}
+				appliedi = s.apply(ents)
+			}
 
 
 			s.node.Advance()
 			s.node.Advance()
 
 
+			if rd.Snapshot.Index > snapi {
+				snapi = rd.Snapshot.Index
+			}
 			if appliedi-snapi > s.snapCount {
 			if appliedi-snapi > s.snapCount {
 				s.snapshot(appliedi, nodes)
 				s.snapshot(appliedi, nodes)
 				snapi = appliedi
 				snapi = appliedi

+ 48 - 1
etcdserver/server_test.go

@@ -31,6 +31,7 @@ import (
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
@@ -900,11 +901,14 @@ func TestRecvSnapshot(t *testing.T) {
 	n := newReadyNode()
 	n := newReadyNode()
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
 		store:   st,
 		store:   st,
 		sender:  &nopSender{},
 		sender:  &nopSender{},
 		storage: p,
 		storage: p,
 		node:    n,
 		node:    n,
+		Cluster: cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -924,15 +928,19 @@ func TestRecvSnapshot(t *testing.T) {
 }
 }
 
 
 // TestRecvSlowSnapshot tests that slow snapshot will not be applied
 // TestRecvSlowSnapshot tests that slow snapshot will not be applied
-// to store.
+// to store. The case could happen when server compacts the log and
+// raft returns the compacted snapshot.
 func TestRecvSlowSnapshot(t *testing.T) {
 func TestRecvSlowSnapshot(t *testing.T) {
 	n := newReadyNode()
 	n := newReadyNode()
 	st := &storeRecorder{}
 	st := &storeRecorder{}
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
 		store:   st,
 		store:   st,
 		sender:  &nopSender{},
 		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		storage: &storageRecorder{},
 		node:    n,
 		node:    n,
+		Cluster: cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -951,6 +959,45 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
+// first and then committed entries.
+func TestApplySnapshotAndCommittedEntries(t *testing.T) {
+	n := newReadyNode()
+	st := &storeRecorder{}
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
+	s := &EtcdServer{
+		store:   st,
+		sender:  &nopSender{},
+		storage: &storageRecorder{},
+		node:    n,
+		Cluster: cl,
+	}
+
+	s.start()
+	req := &pb.Request{Method: "QGET"}
+	n.readyc <- raft.Ready{
+		Snapshot: raftpb.Snapshot{Index: 1},
+		CommittedEntries: []raftpb.Entry{
+			{Index: 2, Data: pbutil.MustMarshal(req)},
+		},
+	}
+	// make goroutines move forward to receive snapshot
+	testutil.ForceGosched()
+	s.Stop()
+
+	actions := st.Action()
+	if len(actions) != 2 {
+		t.Fatalf("len(action) = %d, want 2", len(actions))
+	}
+	if actions[0].name != "Recovery" {
+		t.Errorf("actions[0] = %s, want %s", actions[0].name, "Recovery")
+	}
+	if actions[1].name != "Get" {
+		t.Errorf("actions[1] = %s, want %s", actions[1].name, "Get")
+	}
+}
+
 // TestAddMember tests AddMember can propose and perform node addition.
 // TestAddMember tests AddMember can propose and perform node addition.
 func TestAddMember(t *testing.T) {
 func TestAddMember(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	n := newNodeConfChangeCommitterRecorder()

+ 5 - 4
raft/log.go

@@ -115,10 +115,12 @@ func (l *raftLog) unstableEnts() []pb.Entry {
 }
 }
 
 
 // nextEnts returns all the available entries for execution.
 // nextEnts returns all the available entries for execution.
-// all the returned entries will be marked as applied.
+// If applied is smaller than the index of snapshot, it returns all committed
+// entries after the index of snapshot.
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
-	if l.committed > l.applied {
-		return l.slice(l.applied+1, l.committed+1)
+	off := max(l.applied, l.snapshot.Index)
+	if l.committed > off {
+		return l.slice(off+1, l.committed+1)
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -211,7 +213,6 @@ func (l *raftLog) restore(s pb.Snapshot) {
 	l.ents = []pb.Entry{{Term: s.Term}}
 	l.ents = []pb.Entry{{Term: s.Term}}
 	l.unstable = s.Index + 1
 	l.unstable = s.Index + 1
 	l.committed = s.Index
 	l.committed = s.Index
-	l.applied = s.Index
 	l.offset = s.Index
 	l.offset = s.Index
 	l.snapshot = s
 	l.snapshot = s
 }
 }

+ 31 - 3
raft/log_test.go

@@ -326,6 +326,37 @@ func TestCompactionSideEffects(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestNextEnts(t *testing.T) {
+	snap := pb.Snapshot{Term: 1, Index: 3}
+	ents := []pb.Entry{
+		{Term: 1, Index: 3},
+		{Term: 1, Index: 4},
+		{Term: 1, Index: 5},
+		{Term: 1, Index: 6},
+	}
+	tests := []struct {
+		applied uint64
+		wents   []pb.Entry
+	}{
+		{0, ents[1:3]},
+		{3, ents[1:3]},
+		{4, ents[2:3]},
+		{5, nil},
+	}
+	for i, tt := range tests {
+		raftLog := newLog()
+		raftLog.restore(snap)
+		raftLog.load(ents)
+		raftLog.maybeCommit(5, 1)
+		raftLog.appliedTo(tt.applied)
+
+		ents := raftLog.nextEnts()
+		if !reflect.DeepEqual(ents, tt.wents) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt.wents)
+		}
+	}
+}
+
 func TestUnstableEnts(t *testing.T) {
 func TestUnstableEnts(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
 	tests := []struct {
 	tests := []struct {
@@ -435,9 +466,6 @@ func TestLogRestore(t *testing.T) {
 	if raftLog.offset != index {
 	if raftLog.offset != index {
 		t.Errorf("offset = %d, want %d", raftLog.offset, index)
 		t.Errorf("offset = %d, want %d", raftLog.offset, index)
 	}
 	}
-	if raftLog.applied != index {
-		t.Errorf("applied = %d, want %d", raftLog.applied, index)
-	}
 	if raftLog.committed != index {
 	if raftLog.committed != index {
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 	}
 	}

+ 1 - 0
raft/node.go

@@ -171,6 +171,7 @@ func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st p
 	r := newRaft(id, nil, election, heartbeat)
 	r := newRaft(id, nil, election, heartbeat)
 	if snapshot != nil {
 	if snapshot != nil {
 		r.restore(*snapshot)
 		r.restore(*snapshot)
+		r.raftLog.appliedTo(snapshot.Index)
 	}
 	}
 	if !isHardStateEqual(st, emptyState) {
 	if !isHardStateEqual(st, emptyState) {
 		r.loadState(st)
 		r.loadState(st)

+ 33 - 0
raft/node_test.go

@@ -368,6 +368,39 @@ func TestNodeRestart(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestNodeRestartFromSnapshot(t *testing.T) {
+	snap := &raftpb.Snapshot{
+		Data:  []byte("some data"),
+		Nodes: []uint64{1, 2},
+		Index: 2,
+		Term:  1,
+	}
+	entries := []raftpb.Entry{
+		{Term: 1, Index: 2},
+		{Term: 1, Index: 3, Data: []byte("foo")},
+	}
+	st := raftpb.HardState{Term: 1, Commit: 3}
+
+	want := Ready{
+		HardState: emptyState,
+		// commit upto index commit index in st
+		CommittedEntries: entries[1:],
+	}
+
+	n := RestartNode(1, 10, 1, snap, st, entries)
+	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
+		t.Errorf("g = %+v,\n             w   %+v", g, want)
+	} else {
+		n.Advance()
+	}
+
+	select {
+	case rd := <-n.Ready():
+		t.Errorf("unexpected Ready: %+v", rd)
+	case <-time.After(time.Millisecond):
+	}
+}
+
 // TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
 // TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
 // the raft log (call raft.compact)
 // the raft log (call raft.compact)
 func TestNodeCompact(t *testing.T) {
 func TestNodeCompact(t *testing.T) {