Browse Source

raft: do not restore snapshot if local raft has longer matching history

Raft should not restore the snapshot if it has longer matching history.
Or restoring snapshot might remove the matched entries.
Xiang Li 11 years ago
parent
commit
b3841afcc3
2 changed files with 41 additions and 0 deletions
  1. 7 0
      raft/raft.go
  2. 34 0
      raft/raft_test.go

+ 7 - 0
raft/raft.go

@@ -581,6 +581,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
 	if s.Metadata.Index <= r.raftLog.committed {
 		return false
 	}
+	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
+		log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
+			r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+		r.raftLog.commitTo(s.Metadata.Index)
+		return false
+	}
+
 	log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
 		r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 

+ 34 - 0
raft/raft_test.go

@@ -1121,6 +1121,40 @@ func TestRestore(t *testing.T) {
 	}
 }
 
+func TestRestoreIgnoreSnapshot(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	commit := uint64(1)
+	storage := NewMemoryStorage()
+	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
+	sm.raftLog.append(previousEnts...)
+	sm.raftLog.commitTo(commit)
+
+	s := pb.Snapshot{
+		Metadata: pb.SnapshotMetadata{
+			Index:     commit,
+			Term:      1,
+			ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
+		},
+	}
+
+	// ignore snapshot
+	if ok := sm.restore(s); ok {
+		t.Errorf("restore = %t, want %t", ok, false)
+	}
+	if sm.raftLog.committed != commit {
+		t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit)
+	}
+
+	// ignore snapshot and fast forward commit
+	s.Metadata.Index = commit + 1
+	if ok := sm.restore(s); ok {
+		t.Errorf("restore = %t, want %t", ok, false)
+	}
+	if sm.raftLog.committed != commit+1 {
+		t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1)
+	}
+}
+
 func TestProvideSnap(t *testing.T) {
 	// restore the statemachin from a snapshot
 	// so it has a compacted log and a snapshot