|
|
@@ -22,14 +22,17 @@ import (
|
|
|
"math"
|
|
|
"math/rand"
|
|
|
"reflect"
|
|
|
- "sort"
|
|
|
"testing"
|
|
|
|
|
|
pb "github.com/coreos/etcd/raft/raftpb"
|
|
|
)
|
|
|
|
|
|
// nextEnts returns the appliable entries and updates the applied index
|
|
|
-func nextEnts(r *raft) (ents []pb.Entry) {
|
|
|
+func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
|
|
|
+ // Transfer all unstable entries to "stable" storage.
|
|
|
+ s.Append(r.raftLog.unstableEntries())
|
|
|
+ r.raftLog.stableTo(r.raftLog.lastIndex())
|
|
|
+
|
|
|
ents = r.raftLog.nextEnts()
|
|
|
r.raftLog.appliedTo(r.raftLog.committed)
|
|
|
return ents
|
|
|
@@ -209,7 +212,7 @@ func TestLogReplication(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
ents := []pb.Entry{}
|
|
|
- for _, e := range nextEnts(sm) {
|
|
|
+ for _, e := range nextEnts(sm, tt.network.storage[j]) {
|
|
|
if e.Data != nil {
|
|
|
ents = append(ents, e)
|
|
|
}
|
|
|
@@ -318,9 +321,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestDuelingCandidates(t *testing.T) {
|
|
|
- a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
|
|
- b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
|
|
|
- c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
|
|
|
+ a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
+ b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
+ c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
|
|
|
nt := newNetwork(a, b, c)
|
|
|
nt.cut(1, 3)
|
|
|
@@ -331,7 +334,11 @@ func TestDuelingCandidates(t *testing.T) {
|
|
|
nt.recover()
|
|
|
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
|
|
|
|
|
|
- wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
|
|
|
+ wlog := &raftLog{
|
|
|
+ storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
|
|
|
+ committed: 1,
|
|
|
+ unstable: 2,
|
|
|
+ }
|
|
|
tests := []struct {
|
|
|
sm *raft
|
|
|
state StateType
|
|
|
@@ -340,7 +347,7 @@ func TestDuelingCandidates(t *testing.T) {
|
|
|
}{
|
|
|
{a, StateFollower, 2, wlog},
|
|
|
{b, StateFollower, 2, wlog},
|
|
|
- {c, StateFollower, 2, newLog()},
|
|
|
+ {c, StateFollower, 2, newLog(NewMemoryStorage())},
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
@@ -383,7 +390,13 @@ func TestCandidateConcede(t *testing.T) {
|
|
|
if g := a.Term; g != 1 {
|
|
|
t.Errorf("term = %d, want %d", g, 1)
|
|
|
}
|
|
|
- wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
|
|
|
+ wantLog := ltoa(&raftLog{
|
|
|
+ storage: &MemoryStorage{
|
|
|
+ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
|
|
|
+ },
|
|
|
+ unstable: 3,
|
|
|
+ committed: 2,
|
|
|
+ })
|
|
|
for i, p := range tt.peers {
|
|
|
if sm, ok := p.(*raft); ok {
|
|
|
l := ltoa(sm.raftLog)
|
|
|
@@ -416,10 +429,13 @@ func TestOldMessages(t *testing.T) {
|
|
|
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
|
|
|
|
|
|
l := &raftLog{
|
|
|
- ents: []pb.Entry{
|
|
|
- {}, {Data: nil, Term: 1, Index: 1},
|
|
|
- {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
|
|
|
+ storage: &MemoryStorage{
|
|
|
+ ents: []pb.Entry{
|
|
|
+ {}, {Data: nil, Term: 1, Index: 1},
|
|
|
+ {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
|
|
|
+ },
|
|
|
},
|
|
|
+ unstable: 4,
|
|
|
committed: 3,
|
|
|
}
|
|
|
base := ltoa(l)
|
|
|
@@ -470,9 +486,14 @@ func TestProposal(t *testing.T) {
|
|
|
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
|
|
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
|
|
|
|
|
|
- wantLog := newLog()
|
|
|
+ wantLog := newLog(NewMemoryStorage())
|
|
|
if tt.success {
|
|
|
- wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
|
|
|
+ wantLog = &raftLog{
|
|
|
+ storage: &MemoryStorage{
|
|
|
+ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
|
|
|
+ },
|
|
|
+ unstable: 3,
|
|
|
+ committed: 2}
|
|
|
}
|
|
|
base := ltoa(wantLog)
|
|
|
for i, p := range tt.peers {
|
|
|
@@ -506,7 +527,12 @@ func TestProposalByProxy(t *testing.T) {
|
|
|
// propose via follower
|
|
|
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
|
|
|
|
|
- wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
|
|
|
+ wantLog := &raftLog{
|
|
|
+ storage: &MemoryStorage{
|
|
|
+ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
|
|
|
+ },
|
|
|
+ unstable: 3,
|
|
|
+ committed: 2}
|
|
|
base := ltoa(wantLog)
|
|
|
for i, p := range tt.peers {
|
|
|
if sm, ok := p.(*raft); ok {
|
|
|
@@ -525,50 +551,6 @@ func TestProposalByProxy(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func TestCompact(t *testing.T) {
|
|
|
- tests := []struct {
|
|
|
- compacti uint64
|
|
|
- nodes []uint64
|
|
|
- snapd []byte
|
|
|
- wpanic bool
|
|
|
- }{
|
|
|
- {1, []uint64{1, 2, 3}, []byte("some data"), false},
|
|
|
- {2, []uint64{1, 2, 3}, []byte("some data"), false},
|
|
|
- {4, []uint64{1, 2, 3}, []byte("some data"), true}, // compact out of range
|
|
|
- }
|
|
|
-
|
|
|
- for i, tt := range tests {
|
|
|
- func() {
|
|
|
- defer func() {
|
|
|
- if r := recover(); r != nil {
|
|
|
- if tt.wpanic != true {
|
|
|
- t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
- sm := &raft{
|
|
|
- state: StateLeader,
|
|
|
- raftLog: &raftLog{
|
|
|
- committed: 2,
|
|
|
- applied: 2,
|
|
|
- ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
|
|
- },
|
|
|
- }
|
|
|
- sm.compact(tt.compacti, tt.nodes, tt.snapd)
|
|
|
- sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
|
|
|
- if sm.raftLog.offset != tt.compacti {
|
|
|
- t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
|
|
|
- }
|
|
|
- if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
|
|
|
- t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
|
|
|
- }
|
|
|
- if !reflect.DeepEqual(sm.raftLog.snapshot.Data, tt.snapd) {
|
|
|
- t.Errorf("%d: snap.data = %v, want %v", i, sm.raftLog.snapshot.Data, tt.snapd)
|
|
|
- }
|
|
|
- }()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
func TestCommit(t *testing.T) {
|
|
|
tests := []struct {
|
|
|
matches []uint64
|
|
|
@@ -602,7 +584,11 @@ func TestCommit(t *testing.T) {
|
|
|
for j := 0; j < len(tt.matches); j++ {
|
|
|
prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
|
|
|
}
|
|
|
- sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
|
|
|
+ sm := &raft{
|
|
|
+ raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: uint64(len(tt.logs))},
|
|
|
+ prs: prs,
|
|
|
+ HardState: pb.HardState{Term: tt.smTerm},
|
|
|
+ }
|
|
|
sm.maybeCommit()
|
|
|
if g := sm.raftLog.committed; g != tt.w {
|
|
|
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
|
|
|
@@ -624,7 +610,7 @@ func TestIsElectionTimeout(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- sm := newRaft(1, []uint64{1}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
|
|
sm.elapsed = tt.elapse
|
|
|
c := 0
|
|
|
for j := 0; j < 10000; j++ {
|
|
|
@@ -649,7 +635,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
|
|
fakeStep := func(r *raft, m pb.Message) {
|
|
|
called = true
|
|
|
}
|
|
|
- sm := newRaft(1, []uint64{1}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
|
|
sm.step = fakeStep
|
|
|
sm.Term = 2
|
|
|
sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
|
|
|
@@ -692,7 +678,11 @@ func TestHandleMsgApp(t *testing.T) {
|
|
|
sm := &raft{
|
|
|
state: StateFollower,
|
|
|
HardState: pb.HardState{Term: 2},
|
|
|
- raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
|
|
|
+ raftLog: &raftLog{
|
|
|
+ committed: 0,
|
|
|
+ storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
|
|
|
+ unstable: 3,
|
|
|
+ },
|
|
|
}
|
|
|
|
|
|
sm.handleAppendEntries(tt.m)
|
|
|
@@ -724,10 +714,12 @@ func TestHandleHeartbeat(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
+ storage := NewMemoryStorage()
|
|
|
+ storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
|
|
sm := &raft{
|
|
|
state: StateFollower,
|
|
|
HardState: pb.HardState{Term: 2},
|
|
|
- raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}, {Term: 3}}},
|
|
|
+ raftLog: newLog(storage),
|
|
|
}
|
|
|
sm.raftLog.commitTo(commit)
|
|
|
sm.handleHeartbeat(tt.m)
|
|
|
@@ -776,7 +768,7 @@ func TestRecvMsgVote(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- sm := newRaft(1, []uint64{1}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
|
|
sm.state = tt.state
|
|
|
switch tt.state {
|
|
|
case StateFollower:
|
|
|
@@ -787,7 +779,10 @@ func TestRecvMsgVote(t *testing.T) {
|
|
|
sm.step = stepLeader
|
|
|
}
|
|
|
sm.HardState = pb.HardState{Vote: tt.voteFor}
|
|
|
- sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
|
|
|
+ sm.raftLog = &raftLog{
|
|
|
+ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
|
|
|
+ unstable: 3,
|
|
|
+ }
|
|
|
|
|
|
sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
|
|
|
|
|
|
@@ -833,7 +828,7 @@ func TestStateTransition(t *testing.T) {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- sm := newRaft(1, []uint64{1}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
|
|
sm.state = tt.from
|
|
|
|
|
|
switch tt.to {
|
|
|
@@ -863,16 +858,16 @@ func TestAllServerStepdown(t *testing.T) {
|
|
|
wterm uint64
|
|
|
windex uint64
|
|
|
}{
|
|
|
- {StateFollower, StateFollower, 3, 1},
|
|
|
- {StateCandidate, StateFollower, 3, 1},
|
|
|
- {StateLeader, StateFollower, 3, 2},
|
|
|
+ {StateFollower, StateFollower, 3, 0},
|
|
|
+ {StateCandidate, StateFollower, 3, 0},
|
|
|
+ {StateLeader, StateFollower, 3, 1},
|
|
|
}
|
|
|
|
|
|
tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
|
|
|
tterm := uint64(3)
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
switch tt.state {
|
|
|
case StateFollower:
|
|
|
sm.becomeFollower(1, None)
|
|
|
@@ -892,8 +887,11 @@ func TestAllServerStepdown(t *testing.T) {
|
|
|
if sm.Term != tt.wterm {
|
|
|
t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
|
|
|
}
|
|
|
- if uint64(len(sm.raftLog.ents)) != tt.windex {
|
|
|
- t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
|
|
|
+ if uint64(sm.raftLog.lastIndex()) != tt.windex {
|
|
|
+ t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
|
|
|
+ }
|
|
|
+ if uint64(len(sm.raftLog.allEntries())) != tt.windex {
|
|
|
+ t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
|
|
|
}
|
|
|
wlead := uint64(2)
|
|
|
if msgType == pb.MsgVote {
|
|
|
@@ -928,8 +926,11 @@ func TestLeaderAppResp(t *testing.T) {
|
|
|
for i, tt := range tests {
|
|
|
// sm term is 1 after it becomes the leader.
|
|
|
// thus the last log term must be 1 to be committed.
|
|
|
- sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
|
|
- sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
|
|
+ sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
+ sm.raftLog = &raftLog{
|
|
|
+ storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
|
|
|
+ unstable: 3,
|
|
|
+ }
|
|
|
sm.becomeCandidate()
|
|
|
sm.becomeLeader()
|
|
|
sm.readMessages()
|
|
|
@@ -965,18 +966,21 @@ func TestBcastBeat(t *testing.T) {
|
|
|
offset := uint64(1000)
|
|
|
// make a state machine with log.offset = 1000
|
|
|
s := pb.Snapshot{
|
|
|
- Index: offset,
|
|
|
- Term: 1,
|
|
|
- Nodes: []uint64{1, 2, 3},
|
|
|
+ Metadata: pb.SnapshotMetadata{
|
|
|
+ Index: offset,
|
|
|
+ Term: 1,
|
|
|
+ ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
|
|
|
+ },
|
|
|
}
|
|
|
- sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
|
|
+ storage := NewMemoryStorage()
|
|
|
+ storage.ApplySnapshot(s)
|
|
|
+ sm := newRaft(1, nil, 10, 1, storage)
|
|
|
sm.Term = 1
|
|
|
- sm.restore(s)
|
|
|
|
|
|
sm.becomeCandidate()
|
|
|
sm.becomeLeader()
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- sm.appendEntry(pb.Entry{})
|
|
|
+ sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
|
|
|
}
|
|
|
// slow follower
|
|
|
sm.prs[2].match, sm.prs[2].next = 5, 6
|
|
|
@@ -1029,8 +1033,8 @@ func TestRecvMsgBeat(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
|
|
- sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
|
|
+ sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
|
|
+ sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
|
|
|
sm.Term = 1
|
|
|
sm.state = tt.state
|
|
|
switch tt.state {
|
|
|
@@ -1072,7 +1076,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
for i, tt := range tests {
|
|
|
- sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
sm.raftLog.append(0, previousEnts...)
|
|
|
sm.becomeCandidate()
|
|
|
sm.becomeLeader()
|
|
|
@@ -1088,28 +1092,28 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
|
|
|
|
|
func TestRestore(t *testing.T) {
|
|
|
s := pb.Snapshot{
|
|
|
- Index: 11, // magic number
|
|
|
- Term: 11, // magic number
|
|
|
- Nodes: []uint64{1, 2, 3},
|
|
|
+ Metadata: pb.SnapshotMetadata{
|
|
|
+ Index: 11, // magic number
|
|
|
+ Term: 11, // magic number
|
|
|
+ ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
|
|
|
+ },
|
|
|
}
|
|
|
|
|
|
- sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ storage := NewMemoryStorage()
|
|
|
+ sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
|
|
|
if ok := sm.restore(s); !ok {
|
|
|
t.Fatal("restore fail, want succeed")
|
|
|
}
|
|
|
|
|
|
- if sm.raftLog.lastIndex() != s.Index {
|
|
|
- t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Index)
|
|
|
+ if sm.raftLog.lastIndex() != s.Metadata.Index {
|
|
|
+ t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
|
|
|
}
|
|
|
- if sm.raftLog.term(s.Index) != s.Term {
|
|
|
- t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Index), s.Term)
|
|
|
+ if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term {
|
|
|
+ t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term)
|
|
|
}
|
|
|
sg := sm.nodes()
|
|
|
- if !reflect.DeepEqual(sg, s.Nodes) {
|
|
|
- t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Nodes)
|
|
|
- }
|
|
|
- if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
|
|
- t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
|
|
|
+ if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
|
|
|
+ t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
|
|
|
}
|
|
|
|
|
|
if ok := sm.restore(s); ok {
|
|
|
@@ -1118,14 +1122,17 @@ func TestRestore(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestProvideSnap(t *testing.T) {
|
|
|
- s := pb.Snapshot{
|
|
|
- Index: 11, // magic number
|
|
|
- Term: 11, // magic number
|
|
|
- Nodes: []uint64{1, 2},
|
|
|
- }
|
|
|
- sm := newRaft(1, []uint64{1}, 10, 1)
|
|
|
// restore the statemachin from a snapshot
|
|
|
// so it has a compacted log and a snapshot
|
|
|
+ s := pb.Snapshot{
|
|
|
+ Metadata: pb.SnapshotMetadata{
|
|
|
+ Index: 11, // magic number
|
|
|
+ Term: 11, // magic number
|
|
|
+ ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
|
|
+ },
|
|
|
+ }
|
|
|
+ storage := NewMemoryStorage()
|
|
|
+ sm := newRaft(1, []uint64{1}, 10, 1, storage)
|
|
|
sm.restore(s)
|
|
|
|
|
|
sm.becomeCandidate()
|
|
|
@@ -1133,7 +1140,7 @@ func TestProvideSnap(t *testing.T) {
|
|
|
|
|
|
// force set the next of node 1, so that
|
|
|
// node 1 needs a snapshot
|
|
|
- sm.prs[2].next = sm.raftLog.offset
|
|
|
+ sm.prs[2].next = sm.raftLog.firstIndex()
|
|
|
|
|
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
|
|
|
msgs := sm.readMessages()
|
|
|
@@ -1148,18 +1155,18 @@ func TestProvideSnap(t *testing.T) {
|
|
|
|
|
|
func TestRestoreFromSnapMsg(t *testing.T) {
|
|
|
s := pb.Snapshot{
|
|
|
- Index: 11, // magic number
|
|
|
- Term: 11, // magic number
|
|
|
- Nodes: []uint64{1, 2},
|
|
|
+ Metadata: pb.SnapshotMetadata{
|
|
|
+ Index: 11, // magic number
|
|
|
+ Term: 11, // magic number
|
|
|
+ ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
|
|
|
+ },
|
|
|
}
|
|
|
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
|
|
|
|
|
|
- sm := newRaft(2, []uint64{1, 2}, 10, 1)
|
|
|
+ sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
sm.Step(m)
|
|
|
|
|
|
- if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
|
|
- t.Errorf("snapshot = %+v, want %+v", sm.raftLog.snapshot, s)
|
|
|
- }
|
|
|
+ // TODO(bdarnell): what should this test?
|
|
|
}
|
|
|
|
|
|
func TestSlowNodeRestore(t *testing.T) {
|
|
|
@@ -1171,16 +1178,13 @@ func TestSlowNodeRestore(t *testing.T) {
|
|
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
|
|
}
|
|
|
lead := nt.peers[1].(*raft)
|
|
|
- nextEnts(lead)
|
|
|
- lead.compact(lead.raftLog.applied, lead.nodes(), nil)
|
|
|
+ nextEnts(lead, nt.storage[1])
|
|
|
+ nt.storage[1].Compact(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
|
|
|
|
|
|
nt.recover()
|
|
|
// trigger a snapshot
|
|
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
|
|
follower := nt.peers[3].(*raft)
|
|
|
- if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
|
|
|
- t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
|
|
|
- }
|
|
|
|
|
|
// trigger a commit
|
|
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
|
|
@@ -1193,7 +1197,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
|
|
// it appends the entry to log and sets pendingConf to be true.
|
|
|
func TestStepConfig(t *testing.T) {
|
|
|
// a raft that cannot make progress
|
|
|
- r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
r.becomeCandidate()
|
|
|
r.becomeLeader()
|
|
|
index := r.raftLog.lastIndex()
|
|
|
@@ -1211,7 +1215,7 @@ func TestStepConfig(t *testing.T) {
|
|
|
// the proposal and keep its original state.
|
|
|
func TestStepIgnoreConfig(t *testing.T) {
|
|
|
// a raft that cannot make progress
|
|
|
- r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
r.becomeCandidate()
|
|
|
r.becomeLeader()
|
|
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
|
|
@@ -1237,7 +1241,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
|
|
{pb.EntryConfChange, true},
|
|
|
}
|
|
|
for i, tt := range tests {
|
|
|
- r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
r.appendEntry(pb.Entry{Type: tt.entType})
|
|
|
r.becomeCandidate()
|
|
|
r.becomeLeader()
|
|
|
@@ -1256,7 +1260,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
|
|
t.Errorf("expect panic, but nothing happens")
|
|
|
}
|
|
|
}()
|
|
|
- r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
|
|
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
|
|
r.becomeCandidate()
|
|
|
@@ -1266,7 +1270,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
|
|
|
|
|
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
|
|
func TestAddNode(t *testing.T) {
|
|
|
- r := newRaft(1, []uint64{1}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
|
|
r.pendingConf = true
|
|
|
r.addNode(2)
|
|
|
if r.pendingConf != false {
|
|
|
@@ -1282,7 +1286,7 @@ func TestAddNode(t *testing.T) {
|
|
|
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
|
|
// and removed list correctly.
|
|
|
func TestRemoveNode(t *testing.T) {
|
|
|
- r := newRaft(1, []uint64{1, 2}, 10, 1)
|
|
|
+ r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
|
|
r.pendingConf = true
|
|
|
r.removeNode(2)
|
|
|
if r.pendingConf != false {
|
|
|
@@ -1331,7 +1335,7 @@ func TestRaftNodes(t *testing.T) {
|
|
|
},
|
|
|
}
|
|
|
for i, tt := range tests {
|
|
|
- r := newRaft(1, tt.ids, 10, 1)
|
|
|
+ r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage())
|
|
|
if !reflect.DeepEqual(r.nodes(), tt.wids) {
|
|
|
t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
|
|
|
}
|
|
|
@@ -1340,17 +1344,23 @@ func TestRaftNodes(t *testing.T) {
|
|
|
|
|
|
func ents(terms ...uint64) *raft {
|
|
|
ents := []pb.Entry{{}}
|
|
|
- for _, term := range terms {
|
|
|
- ents = append(ents, pb.Entry{Term: term})
|
|
|
+ for i, term := range terms {
|
|
|
+ ents = append(ents, pb.Entry{Index: uint64(i), Term: term})
|
|
|
}
|
|
|
|
|
|
- sm := &raft{raftLog: &raftLog{ents: ents}}
|
|
|
+ sm := &raft{
|
|
|
+ raftLog: &raftLog{
|
|
|
+ storage: &MemoryStorage{ents: ents},
|
|
|
+ unstable: uint64(len(ents)),
|
|
|
+ },
|
|
|
+ }
|
|
|
sm.reset(0)
|
|
|
return sm
|
|
|
}
|
|
|
|
|
|
type network struct {
|
|
|
peers map[uint64]Interface
|
|
|
+ storage map[uint64]*MemoryStorage
|
|
|
dropm map[connem]float64
|
|
|
ignorem map[pb.MessageType]bool
|
|
|
}
|
|
|
@@ -1364,12 +1374,14 @@ func newNetwork(peers ...Interface) *network {
|
|
|
peerAddrs := idsBySize(size)
|
|
|
|
|
|
npeers := make(map[uint64]Interface, size)
|
|
|
+ nstorage := make(map[uint64]*MemoryStorage, size)
|
|
|
|
|
|
for i, p := range peers {
|
|
|
id := peerAddrs[i]
|
|
|
switch v := p.(type) {
|
|
|
case nil:
|
|
|
- sm := newRaft(id, peerAddrs, 10, 1)
|
|
|
+ nstorage[id] = NewMemoryStorage()
|
|
|
+ sm := newRaft(id, peerAddrs, 10, 1, nstorage[id])
|
|
|
npeers[id] = sm
|
|
|
case *raft:
|
|
|
v.id = id
|
|
|
@@ -1387,6 +1399,7 @@ func newNetwork(peers ...Interface) *network {
|
|
|
}
|
|
|
return &network{
|
|
|
peers: npeers,
|
|
|
+ storage: nstorage,
|
|
|
dropm: make(map[connem]float64),
|
|
|
ignorem: make(map[pb.MessageType]bool),
|
|
|
}
|