فهرست منبع

raft: not set applied when restored from snapshot

applied is only updated by application level through Advance.
Yicheng Qin 11 سال پیش
والد
کامیت
7d0ffb3f12
4فایلهای تغییر یافته به همراه70 افزوده شده و 7 حذف شده
  1. 5 4
      raft/log.go
  2. 31 3
      raft/log_test.go
  3. 1 0
      raft/node.go
  4. 33 0
      raft/node_test.go

+ 5 - 4
raft/log.go

@@ -115,10 +115,12 @@ func (l *raftLog) unstableEnts() []pb.Entry {
 }
 
 // nextEnts returns all the available entries for execution.
-// all the returned entries will be marked as applied.
+// If applied is smaller than the index of snapshot, it returns all committed
+// entries after the index of snapshot.
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
-	if l.committed > l.applied {
-		return l.slice(l.applied+1, l.committed+1)
+	off := max(l.applied, l.snapshot.Index)
+	if l.committed > off {
+		return l.slice(off+1, l.committed+1)
 	}
 	return nil
 }
@@ -211,7 +213,6 @@ func (l *raftLog) restore(s pb.Snapshot) {
 	l.ents = []pb.Entry{{Term: s.Term}}
 	l.unstable = s.Index + 1
 	l.committed = s.Index
-	l.applied = s.Index
 	l.offset = s.Index
 	l.snapshot = s
 }

+ 31 - 3
raft/log_test.go

@@ -326,6 +326,37 @@ func TestCompactionSideEffects(t *testing.T) {
 	}
 }
 
+func TestNextEnts(t *testing.T) {
+	snap := pb.Snapshot{Term: 1, Index: 3}
+	ents := []pb.Entry{
+		{Term: 1, Index: 3},
+		{Term: 1, Index: 4},
+		{Term: 1, Index: 5},
+		{Term: 1, Index: 6},
+	}
+	tests := []struct {
+		applied uint64
+		wents   []pb.Entry
+	}{
+		{0, ents[1:3]},
+		{3, ents[1:3]},
+		{4, ents[2:3]},
+		{5, nil},
+	}
+	for i, tt := range tests {
+		raftLog := newLog()
+		raftLog.restore(snap)
+		raftLog.load(ents)
+		raftLog.maybeCommit(5, 1)
+		raftLog.appliedTo(tt.applied)
+
+		ents := raftLog.nextEnts()
+		if !reflect.DeepEqual(ents, tt.wents) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt.wents)
+		}
+	}
+}
+
 func TestUnstableEnts(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
 	tests := []struct {
@@ -435,9 +466,6 @@ func TestLogRestore(t *testing.T) {
 	if raftLog.offset != index {
 		t.Errorf("offset = %d, want %d", raftLog.offset, index)
 	}
-	if raftLog.applied != index {
-		t.Errorf("applied = %d, want %d", raftLog.applied, index)
-	}
 	if raftLog.committed != index {
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 	}

+ 1 - 0
raft/node.go

@@ -171,6 +171,7 @@ func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st p
 	r := newRaft(id, nil, election, heartbeat)
 	if snapshot != nil {
 		r.restore(*snapshot)
+		r.raftLog.appliedTo(snapshot.Index)
 	}
 	if !isHardStateEqual(st, emptyState) {
 		r.loadState(st)

+ 33 - 0
raft/node_test.go

@@ -368,6 +368,39 @@ func TestNodeRestart(t *testing.T) {
 	}
 }
 
+func TestNodeRestartFromSnapshot(t *testing.T) {
+	snap := &raftpb.Snapshot{
+		Data:  []byte("some data"),
+		Nodes: []uint64{1, 2},
+		Index: 2,
+		Term:  1,
+	}
+	entries := []raftpb.Entry{
+		{Term: 1, Index: 2},
+		{Term: 1, Index: 3, Data: []byte("foo")},
+	}
+	st := raftpb.HardState{Term: 1, Commit: 3}
+
+	want := Ready{
+		HardState: emptyState,
+		// commit upto index commit index in st
+		CommittedEntries: entries[1:],
+	}
+
+	n := RestartNode(1, 10, 1, snap, st, entries)
+	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
+		t.Errorf("g = %+v,\n             w   %+v", g, want)
+	} else {
+		n.Advance()
+	}
+
+	select {
+	case rd := <-n.Ready():
+		t.Errorf("unexpected Ready: %+v", rd)
+	case <-time.After(time.Millisecond):
+	}
+}
+
 // TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
 // the raft log (call raft.compact)
 func TestNodeCompact(t *testing.T) {