Bladeren bron

etcd/raft: add snap

Xiang Li 11 jaren geleden
bovenliggende
commit
6030261363
6 gewijzigde bestanden met toevoegingen van 49 en 45 verwijderingen
  1. 18 0
      etcd/etcd_test.go
  2. 9 0
      etcd/participant.go
  3. 5 0
      raft/log.go
  4. 12 0
      raft/node.go
  5. 5 0
      raft/raft.go
  6. 0 45
      raft/raft_test.go

+ 18 - 0
etcd/etcd_test.go

@@ -368,6 +368,24 @@ func TestSingleNodeRecovery(t *testing.T) {
 	destroyServer(t, e, h)
 }
 
+func TestTakingSnapshot(t *testing.T) {
+	es, hs := buildCluster(1, false)
+	for i := 0; i < defaultCompact; i++ {
+		es[0].p.Set("/foo", false, "bar", store.Permanent)
+	}
+	snap := es[0].p.node.GetSnap()
+	if snap.Index != defaultCompact {
+		t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
+	}
+
+	for i := range hs {
+		es[len(hs)-i-1].Stop()
+	}
+	for i := range hs {
+		hs[len(hs)-i-1].Close()
+	}
+}
+
 func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
 	bootstrapper := 0
 	es := make([]*Server, number)

+ 9 - 0
etcd/participant.go

@@ -36,6 +36,7 @@ import (
 const (
 	defaultHeartbeat = 1
 	defaultElection  = 5
+	defaultCompact   = 10000
 
 	maxBufferedProposal = 128
 
@@ -213,6 +214,14 @@ func (p *participant) run() int64 {
 			log.Printf("id=%x participant.end\n", p.id)
 			return standbyMode
 		}
+		if p.node.EntsLen() > defaultCompact {
+			d, err := p.Save()
+			if err != nil {
+				panic(err)
+			}
+			p.node.Compact(d)
+			log.Printf("id=%x compacted index=\n", p.id)
+		}
 	}
 }
 

+ 5 - 0
raft/log.go

@@ -24,6 +24,7 @@ type raftLog struct {
 	committed int64
 	applied   int64
 	offset    int64
+	snapshot  Snapshot
 
 	// want a compact after the number of entries exceeds the threshold
 	// TODO(xiangli) size might be a better criteria
@@ -154,6 +155,10 @@ func (l *raftLog) compact(i int64) int64 {
 	return int64(len(l.ents))
 }
 
+func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) {
+	l.snapshot = Snapshot{d, nodes, index, term}
+}
+
 func (l *raftLog) shouldCompact() bool {
 	return (l.applied - l.offset) > l.compactThreshold
 }

+ 12 - 0
raft/node.go

@@ -231,3 +231,15 @@ func (n *Node) UnstableState() State {
 	n.sm.clearState()
 	return s
 }
+
+func (n *Node) GetSnap() Snapshot {
+	return n.sm.raftLog.snapshot
+}
+
+func (n *Node) Compact(d []byte) {
+	n.sm.compact(d)
+}
+
+func (n *Node) EntsLen() int {
+	return len(n.sm.raftLog.ents)
+}

+ 5 - 0
raft/raft.go

@@ -513,6 +513,11 @@ func (sm *stateMachine) maybeCompact() bool {
 	return true
 }
 
+func (sm *stateMachine) compact(d []byte) {
+	sm.raftLog.snap(d, sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes())
+	sm.raftLog.compact(sm.raftLog.applied)
+}
+
 // restore recovers the statemachine from a snapshot. It restores the log and the
 // configuration of statemachine. It calls the snapshoter to restore from the given
 // snapshot.

+ 0 - 45
raft/raft_test.go

@@ -774,51 +774,6 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 }
 
-func TestMaybeCompact(t *testing.T) {
-	tests := []struct {
-		snapshoter Snapshoter
-		applied    int64
-		wCompact   bool
-	}{
-		{nil, defaultCompactThreshold + 1, false},
-		{new(logSnapshoter), defaultCompactThreshold - 1, false},
-		{new(logSnapshoter), defaultCompactThreshold + 1, true},
-	}
-
-	for i, tt := range tests {
-		sm := newStateMachine(0, []int64{0, 1, 2})
-		sm.setSnapshoter(tt.snapshoter)
-		for i := 0; i < defaultCompactThreshold*2; i++ {
-			sm.raftLog.append(int64(i), Entry{Term: int64(i + 1)})
-		}
-		sm.raftLog.applied = tt.applied
-		sm.raftLog.committed = tt.applied
-
-		if g := sm.maybeCompact(); g != tt.wCompact {
-			t.Errorf("#%d: compact = %v, want %v", i, g, tt.wCompact)
-		}
-
-		if tt.wCompact {
-			s := sm.snapshoter.GetSnap()
-			if s.Index != tt.applied {
-				t.Errorf("#%d: snap.Index = %v, want %v", i, s.Index, tt.applied)
-			}
-			if s.Term != tt.applied {
-				t.Errorf("#%d: snap.Term = %v, want %v", i, s.Index, tt.applied)
-			}
-
-			w := sm.nodes()
-			sw := int64Slice(w)
-			sg := int64Slice(s.Nodes)
-			sort.Sort(sw)
-			sort.Sort(sg)
-			if !reflect.DeepEqual(sg, sw) {
-				t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, sg, sw)
-			}
-		}
-	}
-}
-
 func TestRestore(t *testing.T) {
 	s := Snapshot{
 		Index: defaultCompactThreshold + 1,