|
@@ -160,15 +160,6 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
|
|
|
|
|
|
|
|
tt.recover()
|
|
tt.recover()
|
|
|
|
|
|
|
|
- // send out a heartbeat
|
|
|
|
|
- // after append a ChangeTerm entry from the current term, all entries
|
|
|
|
|
- // should be committed
|
|
|
|
|
- tt.send(pb.Message{From: 2, To: 2, Type: msgBeat})
|
|
|
|
|
-
|
|
|
|
|
- if sm.raftLog.committed != 4 {
|
|
|
|
|
- t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// still be able to append a entry
|
|
// still be able to append a entry
|
|
|
tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
|
|
tt.send(pb.Message{From: 2, To: 2, Type: msgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
|
|
|
|
|
|
|
@@ -729,17 +720,16 @@ func TestLeaderAppResp(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// When the leader receives a heartbeat tick, it should
|
|
// When the leader receives a heartbeat tick, it should
|
|
|
-// send a msgApp with m.Index = max(progress.next-1,log.offset) and empty
|
|
|
|
|
-// entries.
|
|
|
|
|
|
|
+// send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries.
|
|
|
func TestBcastBeat(t *testing.T) {
|
|
func TestBcastBeat(t *testing.T) {
|
|
|
offset := int64(1000)
|
|
offset := int64(1000)
|
|
|
// make a state machine with log.offset = 1000
|
|
// make a state machine with log.offset = 1000
|
|
|
s := pb.Snapshot{
|
|
s := pb.Snapshot{
|
|
|
Index: offset,
|
|
Index: offset,
|
|
|
Term: 1,
|
|
Term: 1,
|
|
|
- Nodes: []int64{1, 2},
|
|
|
|
|
|
|
+ Nodes: []int64{1, 2, 3},
|
|
|
}
|
|
}
|
|
|
- sm := newRaft(1, []int64{1, 2}, 0, 0)
|
|
|
|
|
|
|
+ sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
|
|
|
sm.Term = 1
|
|
sm.Term = 1
|
|
|
sm.restore(s)
|
|
sm.restore(s)
|
|
|
|
|
|
|
@@ -749,40 +739,26 @@ func TestBcastBeat(t *testing.T) {
|
|
|
sm.appendEntry(pb.Entry{})
|
|
sm.appendEntry(pb.Entry{})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- tests := []struct {
|
|
|
|
|
- pnext int64
|
|
|
|
|
- windex int64
|
|
|
|
|
- wterm int64
|
|
|
|
|
- wto int64
|
|
|
|
|
- }{
|
|
|
|
|
- {offset + 1, offset, 1, 2},
|
|
|
|
|
- {offset + 2, offset + 1, 2, 2},
|
|
|
|
|
- // pr.next -1 < offset
|
|
|
|
|
- {offset, offset, 1, 2},
|
|
|
|
|
- {offset - 1, offset, 1, 2},
|
|
|
|
|
|
|
+ sm.Step(pb.Message{Type: msgBeat})
|
|
|
|
|
+ msgs := sm.ReadMessages()
|
|
|
|
|
+ if len(msgs) != 2 {
|
|
|
|
|
+ t.Fatalf("len(msgs) = %v, want 1", len(msgs))
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- for i, tt := range tests {
|
|
|
|
|
- sm.prs[2].match = 0
|
|
|
|
|
- sm.prs[2].next = tt.pnext
|
|
|
|
|
-
|
|
|
|
|
- sm.Step(pb.Message{Type: msgBeat})
|
|
|
|
|
- msgs := sm.ReadMessages()
|
|
|
|
|
- if len(msgs) != 1 {
|
|
|
|
|
- t.Fatalf("#%d: len(msgs) = %v, want 1", i, len(msgs))
|
|
|
|
|
- }
|
|
|
|
|
- m := msgs[0]
|
|
|
|
|
|
|
+ tomap := map[int64]bool{2: true, 3: true}
|
|
|
|
|
+ for i, m := range msgs {
|
|
|
if m.Type != msgApp {
|
|
if m.Type != msgApp {
|
|
|
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp)
|
|
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, msgApp)
|
|
|
}
|
|
}
|
|
|
- if m.Index != tt.windex {
|
|
|
|
|
- t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, tt.windex)
|
|
|
|
|
|
|
+ if m.Index != 0 {
|
|
|
|
|
+ t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
|
|
|
}
|
|
}
|
|
|
- if m.LogTerm != tt.wterm {
|
|
|
|
|
- t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, tt.wterm)
|
|
|
|
|
|
|
+ if m.LogTerm != 0 {
|
|
|
|
|
+ t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
|
|
|
}
|
|
}
|
|
|
- if m.To != tt.wto {
|
|
|
|
|
- t.Fatalf("#%d: to = %d, want %d", i, m.To, tt.wto)
|
|
|
|
|
|
|
+ if !tomap[m.To] {
|
|
|
|
|
+ t.Fatalf("#%d: unexpected to %d", i, m.To)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ delete(tomap, m.To)
|
|
|
}
|
|
}
|
|
|
if len(m.Entries) != 0 {
|
|
if len(m.Entries) != 0 {
|
|
|
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
|
|
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
|
|
@@ -877,26 +853,16 @@ func TestProvideSnap(t *testing.T) {
|
|
|
sm.becomeCandidate()
|
|
sm.becomeCandidate()
|
|
|
sm.becomeLeader()
|
|
sm.becomeLeader()
|
|
|
|
|
|
|
|
- sm.Step(pb.Message{From: 1, To: 1, Type: msgBeat})
|
|
|
|
|
- msgs := sm.ReadMessages()
|
|
|
|
|
- if len(msgs) != 1 {
|
|
|
|
|
- t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
|
|
|
|
- }
|
|
|
|
|
- m := msgs[0]
|
|
|
|
|
- if m.Type != msgApp {
|
|
|
|
|
- t.Errorf("m.Type = %v, want %v", m.Type, msgApp)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// force set the next of node 1, so that
|
|
// force set the next of node 1, so that
|
|
|
// node 1 needs a snapshot
|
|
// node 1 needs a snapshot
|
|
|
sm.prs[2].next = sm.raftLog.offset
|
|
sm.prs[2].next = sm.raftLog.offset
|
|
|
|
|
|
|
|
sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Denied: true})
|
|
sm.Step(pb.Message{From: 2, To: 1, Type: msgAppResp, Index: -1, Denied: true})
|
|
|
- msgs = sm.ReadMessages()
|
|
|
|
|
|
|
+ msgs := sm.ReadMessages()
|
|
|
if len(msgs) != 1 {
|
|
if len(msgs) != 1 {
|
|
|
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
|
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
|
|
}
|
|
}
|
|
|
- m = msgs[0]
|
|
|
|
|
|
|
+ m := msgs[0]
|
|
|
if m.Type != msgSnap {
|
|
if m.Type != msgSnap {
|
|
|
t.Errorf("m.Type = %v, want %v", m.Type, msgSnap)
|
|
t.Errorf("m.Type = %v, want %v", m.Type, msgSnap)
|
|
|
}
|
|
}
|
|
@@ -931,17 +897,17 @@ func TestSlowNodeRestore(t *testing.T) {
|
|
|
lead.compact(nil)
|
|
lead.compact(nil)
|
|
|
|
|
|
|
|
nt.recover()
|
|
nt.recover()
|
|
|
- nt.send(pb.Message{From: 1, To: 1, Type: msgBeat})
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // trigger a snapshot
|
|
|
|
|
+ nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}})
|
|
|
follower := nt.peers[3].(*raft)
|
|
follower := nt.peers[3].(*raft)
|
|
|
if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
|
|
if !reflect.DeepEqual(follower.raftLog.snapshot, lead.raftLog.snapshot) {
|
|
|
t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
|
|
t.Errorf("follower.snap = %+v, want %+v", follower.raftLog.snapshot, lead.raftLog.snapshot)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- committed := follower.raftLog.lastIndex()
|
|
|
|
|
|
|
+ // trigger a commit
|
|
|
nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}})
|
|
nt.send(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{}}})
|
|
|
- if follower.raftLog.committed != committed+1 {
|
|
|
|
|
- t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, committed+1)
|
|
|
|
|
|
|
+ if follower.raftLog.committed != lead.raftLog.committed {
|
|
|
|
|
+ t.Errorf("follower.comitted = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|