Browse Source

Merge pull request #1241 from coreos/compact

raft: compact takes index and nodes parameters
Xiang Li 11 years ago
parent
commit
3f3b9866c6
6 changed files with 97 additions and 35 deletions
  1. 17 15
      etcdserver/server.go
  2. 3 3
      etcdserver/server_test.go
  3. 22 9
      raft/node.go
  4. 4 3
      raft/node_test.go
  5. 7 4
      raft/raft.go
  6. 44 1
      raft/raft_test.go

+ 17 - 15
etcdserver/server.go

@@ -229,6 +229,7 @@ func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	// snapi indicates the index of the last submitted snapshot request
 	var snapi, appliedi int64
+	var nodes []int64
 	for {
 		select {
 		case <-s.ticker:
@@ -265,6 +266,19 @@ func (s *EtcdServer) run() {
 				appliedi = e.Index
 			}
 
+			if rd.SoftState != nil {
+				nodes = rd.SoftState.Nodes
+				if rd.RaftState == raft.StateLeader {
+					syncC = s.syncTicker
+				} else {
+					syncC = nil
+				}
+				if rd.SoftState.ShouldStop {
+					s.Stop()
+					return
+				}
+			}
+
 			if rd.Snapshot.Index > snapi {
 				snapi = rd.Snapshot.Index
 			}
@@ -278,21 +292,9 @@ func (s *EtcdServer) run() {
 			}
 
 			if appliedi-snapi > s.snapCount {
-				s.snapshot()
+				s.snapshot(appliedi, nodes)
 				snapi = appliedi
 			}
-
-			if rd.SoftState != nil {
-				if rd.RaftState == raft.StateLeader {
-					syncC = s.syncTicker
-				} else {
-					syncC = nil
-				}
-				if rd.SoftState.ShouldStop {
-					s.Stop()
-					return
-				}
-			}
 		case <-syncC:
 			s.sync(defaultSyncTimeout)
 		case <-s.done:
@@ -517,14 +519,14 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 }
 
 // TODO: non-blocking snapshot
-func (s *EtcdServer) snapshot() {
+func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
 	d, err := s.store.Save()
 	// TODO: current store will never fail to do a snapshot
 	// what should we do if the store might fail?
 	if err != nil {
 		panic("TODO: this is bad, what do we do about it?")
 	}
-	s.node.Compact(d)
+	s.node.Compact(snapi, snapnodes, d)
 	s.storage.Cut()
 }
 

+ 3 - 3
etcdserver/server_test.go

@@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) {
 		node:    n,
 	}
 
-	s.snapshot()
+	s.snapshot(0, []int64{1})
 	gaction := st.Action()
 	if len(gaction) != 1 {
 		t.Fatalf("len(action) = %d, want 1", len(gaction))
@@ -1129,7 +1129,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return
 func (n *readyNode) Ready() <-chan raft.Ready                           { return n.readyc }
 func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange)             {}
 func (n *readyNode) Stop()                                              {}
-func (n *readyNode) Compact(d []byte)                                   {}
+func (n *readyNode) Compact(index int64, nodes []int64, d []byte)       {}
 
 type nodeRecorder struct {
 	recorder
@@ -1161,7 +1161,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 func (n *nodeRecorder) Stop() {
 	n.record(action{name: "Stop"})
 }
-func (n *nodeRecorder) Compact(d []byte) {
+func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) {
 	n.record(action{name: "Compact"})
 }
 

+ 22 - 9
raft/node.go

@@ -3,6 +3,7 @@ package raft
 import (
 	"errors"
 	"log"
+	"reflect"
 
 	pb "github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
@@ -18,11 +19,12 @@ var (
 type SoftState struct {
 	Lead       int64
 	RaftState  StateType
+	Nodes      []int64
 	ShouldStop bool
 }
 
 func (a *SoftState) equal(b *SoftState) bool {
-	return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop
+	return reflect.DeepEqual(a, b)
 }
 
 // Ready encapsulates the entries and messages that are ready to read,
@@ -56,6 +58,12 @@ type Ready struct {
 	Messages []pb.Message
 }
 
+type compact struct {
+	index int64
+	nodes []int64
+	data  []byte
+}
+
 func isHardStateEqual(a, b pb.HardState) bool {
 	return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
 }
@@ -95,8 +103,13 @@ type Node interface {
 	ApplyConfChange(cc pb.ConfChange)
 	// Stop performs any necessary termination of the Node
 	Stop()
-	// Compact
-	Compact(d []byte)
+	// Compact discards the entrire log up to the given index. It also
+	// generates a raft snapshot containing the given nodes configuration
+	// and the given snapshot data.
+	// It is the caller's responsibility to ensure the given configuration
+	// and snapshot data match the actual point-in-time configuration and snapshot
+	// at the given index.
+	Compact(index int64, nodes []int64, d []byte)
 }
 
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
@@ -141,7 +154,7 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.
 type node struct {
 	propc    chan pb.Message
 	recvc    chan pb.Message
-	compactc chan []byte
+	compactc chan compact
 	confc    chan pb.ConfChange
 	readyc   chan Ready
 	tickc    chan struct{}
@@ -152,7 +165,7 @@ func newNode() node {
 	return node{
 		propc:    make(chan pb.Message),
 		recvc:    make(chan pb.Message),
-		compactc: make(chan []byte),
+		compactc: make(chan compact),
 		confc:    make(chan pb.ConfChange),
 		readyc:   make(chan Ready),
 		tickc:    make(chan struct{}),
@@ -200,8 +213,8 @@ func (n *node) run(r *raft) {
 			r.Step(m)
 		case m := <-n.recvc:
 			r.Step(m) // raft never returns an error
-		case d := <-n.compactc:
-			r.compact(d)
+		case c := <-n.compactc:
+			r.compact(c.index, c.nodes, c.data)
 		case cc := <-n.confc:
 			switch cc.Type {
 			case pb.ConfChangeAddNode:
@@ -299,9 +312,9 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) {
 	}
 }
 
-func (n *node) Compact(d []byte) {
+func (n *node) Compact(index int64, nodes []int64, d []byte) {
 	select {
-	case n.compactc <- d:
+	case n.compactc <- compact{index, nodes, d}:
 	case <-n.done:
 	}
 }

+ 4 - 3
raft/node_test.go

@@ -156,7 +156,7 @@ func TestNode(t *testing.T) {
 	}
 	wants := []Ready{
 		{
-			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
+			SoftState: &SoftState{Lead: 1, Nodes: []int64{1}, RaftState: StateLeader},
 			HardState: raftpb.HardState{Term: 1, Commit: 2},
 			Entries: []raftpb.Entry{
 				{},
@@ -221,7 +221,7 @@ func TestNodeRestart(t *testing.T) {
 
 // TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
 // the raft log (call raft.compact)
-func TestCompact(t *testing.T) {
+func TestNodeCompact(t *testing.T) {
 	ctx := context.Background()
 	n := newNode()
 	r := newRaft(1, []int64{1}, 0, 0)
@@ -244,7 +244,7 @@ func TestCompact(t *testing.T) {
 		t.Fatalf("unexpected proposal failure: unable to commit entry")
 	}
 
-	n.Compact(w.Data)
+	n.Compact(w.Index, w.Nodes, w.Data)
 	pkg.ForceGosched()
 	select {
 	case rd := <-n.Ready():
@@ -278,6 +278,7 @@ func TestSoftStateEqual(t *testing.T) {
 		{&SoftState{Lead: 1}, false},
 		{&SoftState{RaftState: StateLeader}, false},
 		{&SoftState{ShouldStop: true}, false},
+		{&SoftState{Nodes: []int64{1, 2}}, false},
 	}
 	for i, tt := range tests {
 		if g := tt.st.equal(&SoftState{}); g != tt.we {

+ 7 - 4
raft/raft.go

@@ -153,7 +153,7 @@ func (r *raft) hasLeader() bool { return r.lead != None }
 func (r *raft) shouldStop() bool { return r.removed[r.id] }
 
 func (r *raft) softState() *SoftState {
-	return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()}
+	return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes(), ShouldStop: r.shouldStop()}
 }
 
 func (r *raft) String() string {
@@ -515,9 +515,12 @@ func stepFollower(r *raft, m pb.Message) {
 	}
 }
 
-func (r *raft) compact(d []byte) {
-	r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), r.nodes())
-	r.raftLog.compact(r.raftLog.applied)
+func (r *raft) compact(index int64, nodes []int64, d []byte) {
+	if index > r.raftLog.applied {
+		panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
+	}
+	r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
+	r.raftLog.compact(index)
 }
 
 // restore recovers the statemachine from a snapshot. It restores the log and the

+ 44 - 1
raft/raft_test.go

@@ -408,6 +408,49 @@ func TestProposalByProxy(t *testing.T) {
 	}
 }
 
+func TestCompact(t *testing.T) {
+	tests := []struct {
+		compacti int64
+		nodes    []int64
+		snapd    []byte
+		wpanic   bool
+	}{
+		{1, []int64{1, 2, 3}, []byte("some data"), false},
+		{2, []int64{1, 2, 3}, []byte("some data"), false},
+		{4, []int64{1, 2, 3}, []byte("some data"), true}, // compact out of range
+	}
+
+	for i, tt := range tests {
+		func() {
+			defer func() {
+				if r := recover(); r != nil {
+					if tt.wpanic != true {
+						t.Errorf("%d: panic = %v, want %v", i, false, true)
+					}
+				}
+			}()
+			sm := &raft{
+				state: StateLeader,
+				raftLog: &raftLog{
+					committed: 2,
+					applied:   2,
+					ents:      []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
+				},
+			}
+			sm.compact(tt.compacti, tt.nodes, tt.snapd)
+			if sm.raftLog.offset != tt.compacti {
+				t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
+			}
+			if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
+				t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
+			}
+			if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
+				t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
+			}
+		}()
+	}
+}
+
 func TestCommit(t *testing.T) {
 	tests := []struct {
 		matches []int64
@@ -897,7 +940,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 	lead := nt.peers[1].(*raft)
 	nextEnts(lead)
-	lead.compact(nil)
+	lead.compact(lead.raftLog.applied, lead.nodes(), nil)
 
 	nt.recover()
 	// trigger a snapshot