Browse Source

raft: compact takes index and nodes parameters
Before this commit, compact always compact log at current appliedindex of raft.
This prevents us from doing non-blocking snapshot since we have to make snapshot
and compact atomically. To prepare for non-blocking snapshot, this commit make
compact supports index and nodes parameters. After completing snapshot, the applier
should call compact with the snapshot index and the nodes at snapshot index to do
a compaction at snapsohot index.

Xiang Li 11 years ago
parent
commit
5587e0d73f
6 changed files with 48 additions and 32 deletions
  1. 17 15
      etcdserver/server.go
  2. 3 3
      etcdserver/server_test.go
  3. 17 8
      raft/node.go
  4. 3 2
      raft/node_test.go
  5. 7 3
      raft/raft.go
  6. 1 1
      raft/raft_test.go

+ 17 - 15
etcdserver/server.go

@@ -229,6 +229,7 @@ func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	// snapi indicates the index of the last submitted snapshot request
 	var snapi, appliedi int64
+	var nodes []int64
 	for {
 		select {
 		case <-s.ticker:
@@ -265,6 +266,19 @@ func (s *EtcdServer) run() {
 				appliedi = e.Index
 			}
 
+			if rd.SoftState != nil {
+				nodes = rd.SoftState.Nodes
+				if rd.RaftState == raft.StateLeader {
+					syncC = s.syncTicker
+				} else {
+					syncC = nil
+				}
+				if rd.SoftState.ShouldStop {
+					s.Stop()
+					return
+				}
+			}
+
 			if rd.Snapshot.Index > snapi {
 				snapi = rd.Snapshot.Index
 			}
@@ -278,21 +292,9 @@ func (s *EtcdServer) run() {
 			}
 
 			if appliedi-snapi > s.snapCount {
-				s.snapshot()
+				s.snapshot(appliedi, nodes)
 				snapi = appliedi
 			}
-
-			if rd.SoftState != nil {
-				if rd.RaftState == raft.StateLeader {
-					syncC = s.syncTicker
-				} else {
-					syncC = nil
-				}
-				if rd.SoftState.ShouldStop {
-					s.Stop()
-					return
-				}
-			}
 		case <-syncC:
 			s.sync(defaultSyncTimeout)
 		case <-s.done:
@@ -517,14 +519,14 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 }
 
 // TODO: non-blocking snapshot
-func (s *EtcdServer) snapshot() {
+func (s *EtcdServer) snapshot(snapi int64, snapnodes []int64) {
 	d, err := s.store.Save()
 	// TODO: current store will never fail to do a snapshot
 	// what should we do if the store might fail?
 	if err != nil {
 		panic("TODO: this is bad, what do we do about it?")
 	}
-	s.node.Compact(d)
+	s.node.Compact(snapi, snapnodes, d)
 	s.storage.Cut()
 }
 

+ 3 - 3
etcdserver/server_test.go

@@ -678,7 +678,7 @@ func TestSnapshot(t *testing.T) {
 		node:    n,
 	}
 
-	s.snapshot()
+	s.snapshot(0, []int64{1})
 	gaction := st.Action()
 	if len(gaction) != 1 {
 		t.Fatalf("len(action) = %d, want 1", len(gaction))
@@ -1129,7 +1129,7 @@ func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return
 func (n *readyNode) Ready() <-chan raft.Ready                           { return n.readyc }
 func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange)             {}
 func (n *readyNode) Stop()                                              {}
-func (n *readyNode) Compact(d []byte)                                   {}
+func (n *readyNode) Compact(index int64, nodes []int64, d []byte)       {}
 
 type nodeRecorder struct {
 	recorder
@@ -1161,7 +1161,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 func (n *nodeRecorder) Stop() {
 	n.record(action{name: "Stop"})
 }
-func (n *nodeRecorder) Compact(d []byte) {
+func (n *nodeRecorder) Compact(index int64, nodes []int64, d []byte) {
 	n.record(action{name: "Compact"})
 }
 

+ 17 - 8
raft/node.go

@@ -3,6 +3,7 @@ package raft
 import (
 	"errors"
 	"log"
+	"reflect"
 
 	pb "github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
@@ -18,11 +19,13 @@ var (
 type SoftState struct {
 	Lead       int64
 	RaftState  StateType
+	Nodes      []int64
 	ShouldStop bool
 }
 
 func (a *SoftState) equal(b *SoftState) bool {
-	return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop
+	nodeeq := reflect.DeepEqual(a.Nodes, b.Nodes)
+	return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop && nodeeq
 }
 
 // Ready encapsulates the entries and messages that are ready to read,
@@ -56,6 +59,12 @@ type Ready struct {
 	Messages []pb.Message
 }
 
+type compact struct {
+	index int64
+	nodes []int64
+	data  []byte
+}
+
 func isHardStateEqual(a, b pb.HardState) bool {
 	return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
 }
@@ -96,7 +105,7 @@ type Node interface {
 	// Stop performs any necessary termination of the Node
 	Stop()
 	// Compact
-	Compact(d []byte)
+	Compact(index int64, nodes []int64, d []byte)
 }
 
 // StartNode returns a new Node given a unique raft id, a list of raft peers, and
@@ -141,7 +150,7 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.
 type node struct {
 	propc    chan pb.Message
 	recvc    chan pb.Message
-	compactc chan []byte
+	compactc chan compact
 	confc    chan pb.ConfChange
 	readyc   chan Ready
 	tickc    chan struct{}
@@ -152,7 +161,7 @@ func newNode() node {
 	return node{
 		propc:    make(chan pb.Message),
 		recvc:    make(chan pb.Message),
-		compactc: make(chan []byte),
+		compactc: make(chan compact),
 		confc:    make(chan pb.ConfChange),
 		readyc:   make(chan Ready),
 		tickc:    make(chan struct{}),
@@ -200,8 +209,8 @@ func (n *node) run(r *raft) {
 			r.Step(m)
 		case m := <-n.recvc:
 			r.Step(m) // raft never returns an error
-		case d := <-n.compactc:
-			r.compact(d)
+		case c := <-n.compactc:
+			r.compact(c.index, c.nodes, c.data)
 		case cc := <-n.confc:
 			switch cc.Type {
 			case pb.ConfChangeAddNode:
@@ -299,9 +308,9 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) {
 	}
 }
 
-func (n *node) Compact(d []byte) {
+func (n *node) Compact(index int64, nodes []int64, d []byte) {
 	select {
-	case n.compactc <- d:
+	case n.compactc <- compact{index, nodes, d}:
 	case <-n.done:
 	}
 }

+ 3 - 2
raft/node_test.go

@@ -156,7 +156,7 @@ func TestNode(t *testing.T) {
 	}
 	wants := []Ready{
 		{
-			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
+			SoftState: &SoftState{Lead: 1, Nodes: []int64{1}, RaftState: StateLeader},
 			HardState: raftpb.HardState{Term: 1, Commit: 2},
 			Entries: []raftpb.Entry{
 				{},
@@ -244,7 +244,7 @@ func TestCompact(t *testing.T) {
 		t.Fatalf("unexpected proposal failure: unable to commit entry")
 	}
 
-	n.Compact(w.Data)
+	n.Compact(w.Index, w.Nodes, w.Data)
 	pkg.ForceGosched()
 	select {
 	case rd := <-n.Ready():
@@ -278,6 +278,7 @@ func TestSoftStateEqual(t *testing.T) {
 		{&SoftState{Lead: 1}, false},
 		{&SoftState{RaftState: StateLeader}, false},
 		{&SoftState{ShouldStop: true}, false},
+		{&SoftState{Nodes: []int64{1, 2}}, false},
 	}
 	for i, tt := range tests {
 		if g := tt.st.equal(&SoftState{}); g != tt.we {

+ 7 - 3
raft/raft.go

@@ -153,7 +153,7 @@ func (r *raft) hasLeader() bool { return r.lead != None }
 func (r *raft) shouldStop() bool { return r.removed[r.id] }
 
 func (r *raft) softState() *SoftState {
-	return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()}
+	return &SoftState{Lead: r.lead, Nodes: r.nodes(), RaftState: r.state, ShouldStop: r.shouldStop()}
 }
 
 func (r *raft) String() string {
@@ -515,9 +515,13 @@ func stepFollower(r *raft, m pb.Message) {
 	}
 }
 
-func (r *raft) compact(d []byte) {
-	r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), r.nodes())
+func (r *raft) compact(index int64, nodes []int64, d []byte) error {
+	if index > r.raftLog.applied {
+		return fmt.Errorf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied)
+	}
+	r.raftLog.snap(d, r.raftLog.applied, r.raftLog.term(r.raftLog.applied), nodes)
 	r.raftLog.compact(r.raftLog.applied)
+	return nil
 }
 
 // restore recovers the statemachine from a snapshot. It restores the log and the

+ 1 - 1
raft/raft_test.go

@@ -897,7 +897,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 	lead := nt.peers[1].(*raft)
 	nextEnts(lead)
-	lead.compact(nil)
+	lead.compact(lead.raftLog.applied, lead.nodes(), nil)
 
 	nt.recover()
 	// trigger a snapshot