|
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /*
- This file contains tests which verify that the scenarios described
- in the raft paper (https://ramcloud.stanford.edu/raft.pdf) are
- handled by the raft implementation correctly. Each test focuses on
- several sentences written in the paper. This could help us to prevent
- most implementation bugs.
- Each test is composed of three parts: init, test and check.
- Init part uses simple and understandable way to simulate the init state.
- Test part uses Step function to generate the scenario. Check part checks
- outgoing messages and state.
- */
- package raft
- import (
- "fmt"
- "testing"
- "reflect"
- "sort"
- pb "github.com/coreos/etcd/raft/raftpb"
- )
- func TestFollowerUpdateTermFromMessage(t *testing.T) {
- testUpdateTermFromMessage(t, StateFollower)
- }
- func TestCandidateUpdateTermFromMessage(t *testing.T) {
- testUpdateTermFromMessage(t, StateCandidate)
- }
- func TestLeaderUpdateTermFromMessage(t *testing.T) {
- testUpdateTermFromMessage(t, StateLeader)
- }
- // testUpdateTermFromMessage tests that if one server’s current term is
- // smaller than the other’s, then it updates its current term to the larger
- // value. If a candidate or leader discovers that its term is out of date,
- // it immediately reverts to follower state.
- // Reference: section 5.1
- func testUpdateTermFromMessage(t *testing.T, state StateType) {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- switch state {
- case StateFollower:
- r.becomeFollower(1, 2)
- case StateCandidate:
- r.becomeCandidate()
- case StateLeader:
- r.becomeCandidate()
- r.becomeLeader()
- }
- r.Step(pb.Message{Type: pb.MsgApp, Term: 2})
- if r.Term != 2 {
- t.Errorf("term = %d, want %d", r.Term, 2)
- }
- if r.state != StateFollower {
- t.Errorf("state = %v, want %v", r.state, StateFollower)
- }
- }
- // TestRejectStaleTermMessage tests that if a server receives a request with
- // a stale term number, it rejects the request.
- // Our implementation ignores the request instead.
- // Reference: section 5.1
- func TestRejectStaleTermMessage(t *testing.T) {
- called := false
- fakeStep := func(r *raft, m pb.Message) {
- called = true
- }
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- r.step = fakeStep
- r.loadState(pb.HardState{Term: 2})
- r.Step(pb.Message{Type: pb.MsgApp, Term: r.Term - 1})
- if called {
- t.Errorf("stepFunc called = %v, want %v", called, false)
- }
- }
- // TestStartAsFollower tests that when servers start up, they begin as followers.
- // Reference: section 5.2
- func TestStartAsFollower(t *testing.T) {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- if r.state != StateFollower {
- t.Errorf("state = %s, want %s", r.state, StateFollower)
- }
- }
- // TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
- // it will send a msgApp with m.Index = 0, m.LogTerm=0 and empty entries as
- // heartbeat to all followers.
- // Reference: section 5.2
- func TestLeaderBcastBeat(t *testing.T) {
- // heartbeat interval
- hi := 1
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage())
- r.becomeCandidate()
- r.becomeLeader()
- for i := 0; i < 10; i++ {
- r.appendEntry(pb.Entry{Index: uint64(i) + 1})
- }
- for i := 0; i < hi; i++ {
- r.tick()
- }
- msgs := r.readMessages()
- sort.Sort(messageSlice(msgs))
- wmsgs := []pb.Message{
- {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
- {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
- }
- if !reflect.DeepEqual(msgs, wmsgs) {
- t.Errorf("msgs = %v, want %v", msgs, wmsgs)
- }
- }
- func TestFollowerStartElection(t *testing.T) {
- testNonleaderStartElection(t, StateFollower)
- }
- func TestCandidateStartNewElection(t *testing.T) {
- testNonleaderStartElection(t, StateCandidate)
- }
- // testNonleaderStartElection tests that if a follower receives no communication
- // over election timeout, it begins an election to choose a new leader. It
- // increments its current term and transitions to candidate state. It then
- // votes for itself and issues RequestVote RPCs in parallel to each of the
- // other servers in the cluster.
- // Reference: section 5.2
- // Also if a candidate fails to obtain a majority, it will time out and
- // start a new election by incrementing its term and initiating another
- // round of RequestVote RPCs.
- // Reference: section 5.2
- func testNonleaderStartElection(t *testing.T, state StateType) {
- // election timeout
- et := 10
- r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
- switch state {
- case StateFollower:
- r.becomeFollower(1, 2)
- case StateCandidate:
- r.becomeCandidate()
- }
- for i := 1; i < 2*et; i++ {
- r.tick()
- }
- if r.Term != 2 {
- t.Errorf("term = %d, want 2", r.Term)
- }
- if r.state != StateCandidate {
- t.Errorf("state = %s, want %s", r.state, StateCandidate)
- }
- if !r.votes[r.id] {
- t.Errorf("vote for self = false, want true")
- }
- msgs := r.readMessages()
- sort.Sort(messageSlice(msgs))
- wmsgs := []pb.Message{
- {From: 1, To: 2, Term: 2, Type: pb.MsgVote},
- {From: 1, To: 3, Term: 2, Type: pb.MsgVote},
- }
- if !reflect.DeepEqual(msgs, wmsgs) {
- t.Errorf("msgs = %v, want %v", msgs, wmsgs)
- }
- }
- // TestLeaderElectionInOneRoundRPC tests all cases that may happen in
- // leader election during one round of RequestVote RPC:
- // a) it wins the election
- // b) it loses the election
- // c) it is unclear about the result
- // Reference: section 5.2
- func TestLeaderElectionInOneRoundRPC(t *testing.T) {
- tests := []struct {
- size int
- votes map[uint64]bool
- state StateType
- }{
- // win the election when receiving votes from a majority of the servers
- {1, map[uint64]bool{}, StateLeader},
- {3, map[uint64]bool{2: true, 3: true}, StateLeader},
- {3, map[uint64]bool{2: true}, StateLeader},
- {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, StateLeader},
- {5, map[uint64]bool{2: true, 3: true, 4: true}, StateLeader},
- {5, map[uint64]bool{2: true, 3: true}, StateLeader},
- // return to follower state if it receives vote denial from a majority
- {3, map[uint64]bool{2: false, 3: false}, StateFollower},
- {5, map[uint64]bool{2: false, 3: false, 4: false, 5: false}, StateFollower},
- {5, map[uint64]bool{2: true, 3: false, 4: false, 5: false}, StateFollower},
- // stay in candidate if it does not obtain the majority
- {3, map[uint64]bool{}, StateCandidate},
- {5, map[uint64]bool{2: true}, StateCandidate},
- {5, map[uint64]bool{2: false, 3: false}, StateCandidate},
- {5, map[uint64]bool{}, StateCandidate},
- }
- for i, tt := range tests {
- r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage())
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
- for id, vote := range tt.votes {
- r.Step(pb.Message{From: id, To: 1, Type: pb.MsgVoteResp, Reject: !vote})
- }
- if r.state != tt.state {
- t.Errorf("#%d: state = %s, want %s", i, r.state, tt.state)
- }
- if g := r.Term; g != 1 {
- t.Errorf("#%d: term = %d, want %d", i, g, 1)
- }
- }
- }
- // TestFollowerVote tests that each follower will vote for at most one
- // candidate in a given term, on a first-come-first-served basis.
- // Reference: section 5.2
- func TestFollowerVote(t *testing.T) {
- tests := []struct {
- vote uint64
- nvote uint64
- wreject bool
- }{
- {None, 1, false},
- {None, 2, false},
- {1, 1, false},
- {2, 2, false},
- {1, 2, true},
- {2, 1, true},
- }
- for i, tt := range tests {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
- r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
- msgs := r.readMessages()
- wmsgs := []pb.Message{
- {From: 1, To: tt.nvote, Term: 1, Type: pb.MsgVoteResp, Reject: tt.wreject},
- }
- if !reflect.DeepEqual(msgs, wmsgs) {
- t.Errorf("#%d: msgs = %v, want %v", i, msgs, wmsgs)
- }
- }
- }
- // TestCandidateFallback tests that while waiting for votes,
- // if a candidate receives an AppendEntries RPC from another server claiming
- // to be leader whose term is at least as large as the candidate's current term,
- // it recognizes the leader as legitimate and returns to follower state.
- // Reference: section 5.2
- func TestCandidateFallback(t *testing.T) {
- tests := []pb.Message{
- {From: 2, To: 1, Term: 1, Type: pb.MsgApp},
- {From: 2, To: 1, Term: 2, Type: pb.MsgApp},
- }
- for i, tt := range tests {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
- if r.state != StateCandidate {
- t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
- }
- r.Step(tt)
- if g := r.state; g != StateFollower {
- t.Errorf("#%d: state = %s, want %s", i, g, StateFollower)
- }
- if g := r.Term; g != tt.Term {
- t.Errorf("#%d: term = %d, want %d", i, g, tt.Term)
- }
- }
- }
- func TestFollowerElectionTimeoutRandomized(t *testing.T) {
- SetLogger(discardLogger)
- defer SetLogger(defaultLogger)
- testNonleaderElectionTimeoutRandomized(t, StateFollower)
- }
- func TestCandidateElectionTimeoutRandomized(t *testing.T) {
- SetLogger(discardLogger)
- defer SetLogger(defaultLogger)
- testNonleaderElectionTimeoutRandomized(t, StateCandidate)
- }
- // testNonleaderElectionTimeoutRandomized tests that election timeout for
- // follower or candidate is randomized.
- // Reference: section 5.2
- func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
- et := 10
- r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
- timeouts := make(map[int]bool)
- for round := 0; round < 50*et; round++ {
- switch state {
- case StateFollower:
- r.becomeFollower(r.Term+1, 2)
- case StateCandidate:
- r.becomeCandidate()
- }
- time := 0
- for len(r.readMessages()) == 0 {
- r.tick()
- time++
- }
- timeouts[time] = true
- }
- for d := et + 1; d < 2*et; d++ {
- if !timeouts[d] {
- t.Errorf("timeout in %d ticks should happen", d)
- }
- }
- }
- func TestFollowersElectioinTimeoutNonconflict(t *testing.T) {
- SetLogger(discardLogger)
- defer SetLogger(defaultLogger)
- testNonleadersElectionTimeoutNonconflict(t, StateFollower)
- }
- func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
- SetLogger(discardLogger)
- defer SetLogger(defaultLogger)
- testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
- }
- // testNonleadersElectionTimeoutNonconflict tests that in most cases only a
- // single server(follower or candidate) will time out, which reduces the
- // likelihood of split vote in the new election.
- // Reference: section 5.2
- func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
- et := 10
- size := 5
- rs := make([]*raft, size)
- ids := idsBySize(size)
- for k := range rs {
- rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage())
- }
- conflicts := 0
- for round := 0; round < 1000; round++ {
- for _, r := range rs {
- switch state {
- case StateFollower:
- r.becomeFollower(r.Term+1, None)
- case StateCandidate:
- r.becomeCandidate()
- }
- }
- timeoutNum := 0
- for timeoutNum == 0 {
- for _, r := range rs {
- r.tick()
- if len(r.readMessages()) > 0 {
- timeoutNum++
- }
- }
- }
- // several rafts time out at the same tick
- if timeoutNum > 1 {
- conflicts++
- }
- }
- if g := float64(conflicts) / 1000; g > 0.3 {
- t.Errorf("probability of conflicts = %v, want <= 0.3", g)
- }
- }
- // TestLeaderStartReplication tests that when receiving client proposals,
- // the leader appends the proposal to its log as a new entry, then issues
- // AppendEntries RPCs in parallel to each of the other servers to replicate
- // the entry. Also, when sending an AppendEntries RPC, the leader includes
- // the index and term of the entry in its log that immediately precedes
- // the new entries.
- // Also, it writes the new entry into stable storage.
- // Reference: section 5.3
- func TestLeaderStartReplication(t *testing.T) {
- s := NewMemoryStorage()
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
- r.becomeCandidate()
- r.becomeLeader()
- commitNoopEntry(r, s)
- li := r.raftLog.lastIndex()
- ents := []pb.Entry{{Data: []byte("some data")}}
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents})
- if g := r.raftLog.lastIndex(); g != li+1 {
- t.Errorf("lastIndex = %d, want %d", g, li+1)
- }
- if g := r.raftLog.committed; g != li {
- t.Errorf("committed = %d, want %d", g, li)
- }
- msgs := r.readMessages()
- sort.Sort(messageSlice(msgs))
- wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
- wmsgs := []pb.Message{
- {From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
- {From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
- }
- if !reflect.DeepEqual(msgs, wmsgs) {
- t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
- }
- if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, wents) {
- t.Errorf("ents = %+v, want %+v", g, wents)
- }
- }
- // TestLeaderCommitEntry tests that when the entry has been safely replicated,
- // the leader gives out the applied entries, which can be applied to its state
- // machine.
- // Also, the leader keeps track of the highest index it knows to be committed,
- // and it includes that index in future AppendEntries RPCs so that the other
- // servers eventually find out.
- // Reference: section 5.3
- func TestLeaderCommitEntry(t *testing.T) {
- s := NewMemoryStorage()
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
- r.becomeCandidate()
- r.becomeLeader()
- commitNoopEntry(r, s)
- li := r.raftLog.lastIndex()
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
- for _, m := range r.readMessages() {
- r.Step(acceptAndReply(m))
- }
- if g := r.raftLog.committed; g != li+1 {
- t.Errorf("committed = %d, want %d", g, li+1)
- }
- wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
- if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
- t.Errorf("nextEnts = %+v, want %+v", g, wents)
- }
- msgs := r.readMessages()
- sort.Sort(messageSlice(msgs))
- for i, m := range msgs {
- if w := uint64(i + 2); m.To != w {
- t.Errorf("to = %x, want %x", m.To, w)
- }
- if m.Type != pb.MsgApp {
- t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
- }
- if m.Commit != li+1 {
- t.Errorf("commit = %d, want %d", m.Commit, li+1)
- }
- }
- }
- // TestLeaderAcknowledgeCommit tests that a log entry is committed once the
- // leader that created the entry has replicated it on a majority of the servers.
- // Reference: section 5.3
- func TestLeaderAcknowledgeCommit(t *testing.T) {
- tests := []struct {
- size int
- acceptors map[uint64]bool
- wack bool
- }{
- {1, nil, true},
- {3, nil, false},
- {3, map[uint64]bool{2: true}, true},
- {3, map[uint64]bool{2: true, 3: true}, true},
- {5, nil, false},
- {5, map[uint64]bool{2: true}, false},
- {5, map[uint64]bool{2: true, 3: true}, true},
- {5, map[uint64]bool{2: true, 3: true, 4: true}, true},
- {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
- }
- for i, tt := range tests {
- s := NewMemoryStorage()
- r := newTestRaft(1, idsBySize(tt.size), 10, 1, s)
- r.becomeCandidate()
- r.becomeLeader()
- commitNoopEntry(r, s)
- li := r.raftLog.lastIndex()
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
- for _, m := range r.readMessages() {
- if tt.acceptors[m.To] {
- r.Step(acceptAndReply(m))
- }
- }
- if g := r.raftLog.committed > li; g != tt.wack {
- t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack)
- }
- }
- }
- // TestLeaderCommitPrecedingEntries tests that when leader commits a log entry,
- // it also commits all preceding entries in the leader’s log, including
- // entries created by previous leaders.
- // Also, it applies the entry to its local state machine (in log order).
- // Reference: section 5.3
- func TestLeaderCommitPrecedingEntries(t *testing.T) {
- tests := [][]pb.Entry{
- {},
- {{Term: 2, Index: 1}},
- {{Term: 1, Index: 1}, {Term: 2, Index: 2}},
- {{Term: 1, Index: 1}},
- }
- for i, tt := range tests {
- storage := NewMemoryStorage()
- storage.Append(tt)
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
- r.loadState(pb.HardState{Term: 2})
- r.becomeCandidate()
- r.becomeLeader()
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
- for _, m := range r.readMessages() {
- r.Step(acceptAndReply(m))
- }
- li := uint64(len(tt))
- wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
- if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
- t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
- }
- }
- }
- // TestFollowerCommitEntry tests that once a follower learns that a log entry
- // is committed, it applies the entry to its local state machine (in log order).
- // Reference: section 5.3
- func TestFollowerCommitEntry(t *testing.T) {
- tests := []struct {
- ents []pb.Entry
- commit uint64
- }{
- {
- []pb.Entry{
- {Term: 1, Index: 1, Data: []byte("some data")},
- },
- 1,
- },
- {
- []pb.Entry{
- {Term: 1, Index: 1, Data: []byte("some data")},
- {Term: 1, Index: 2, Data: []byte("some data2")},
- },
- 2,
- },
- {
- []pb.Entry{
- {Term: 1, Index: 1, Data: []byte("some data2")},
- {Term: 1, Index: 2, Data: []byte("some data")},
- },
- 2,
- },
- {
- []pb.Entry{
- {Term: 1, Index: 1, Data: []byte("some data")},
- {Term: 1, Index: 2, Data: []byte("some data2")},
- },
- 1,
- },
- }
- for i, tt := range tests {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- r.becomeFollower(1, 2)
- r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
- if g := r.raftLog.committed; g != tt.commit {
- t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
- }
- wents := tt.ents[:int(tt.commit)]
- if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
- t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
- }
- }
- }
- // TestFollowerCheckMsgApp tests that if the follower does not find an
- // entry in its log with the same index and term as the one in AppendEntries RPC,
- // then it refuses the new entries. Otherwise it replies that it accepts the
- // append entries.
- // Reference: section 5.3
- func TestFollowerCheckMsgApp(t *testing.T) {
- ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
- tests := []struct {
- term uint64
- index uint64
- windex uint64
- wreject bool
- wrejectHint uint64
- }{
- // match with committed entries
- {0, 0, 1, false, 0},
- {ents[0].Term, ents[0].Index, 1, false, 0},
- // match with uncommitted entries
- {ents[1].Term, ents[1].Index, 2, false, 0},
- // unmatch with existing entry
- {ents[0].Term, ents[1].Index, ents[1].Index, true, 2},
- // unexisting entry
- {ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2},
- }
- for i, tt := range tests {
- storage := NewMemoryStorage()
- storage.Append(ents)
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
- r.loadState(pb.HardState{Commit: 1})
- r.becomeFollower(2, 2)
- r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
- msgs := r.readMessages()
- wmsgs := []pb.Message{
- {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint},
- }
- if !reflect.DeepEqual(msgs, wmsgs) {
- t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
- }
- }
- }
- // TestFollowerAppendEntries tests that when AppendEntries RPC is valid,
- // the follower will delete the existing conflict entry and all that follow it,
- // and append any new entries not already in the log.
- // Also, it writes the new entry into stable storage.
- // Reference: section 5.3
- func TestFollowerAppendEntries(t *testing.T) {
- tests := []struct {
- index, term uint64
- ents []pb.Entry
- wents []pb.Entry
- wunstable []pb.Entry
- }{
- {
- 2, 2,
- []pb.Entry{{Term: 3, Index: 3}},
- []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
- []pb.Entry{{Term: 3, Index: 3}},
- },
- {
- 1, 1,
- []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
- []pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}},
- []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
- },
- {
- 0, 0,
- []pb.Entry{{Term: 1, Index: 1}},
- []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
- nil,
- },
- {
- 0, 0,
- []pb.Entry{{Term: 3, Index: 1}},
- []pb.Entry{{Term: 3, Index: 1}},
- []pb.Entry{{Term: 3, Index: 1}},
- },
- }
- for i, tt := range tests {
- storage := NewMemoryStorage()
- storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
- r.becomeFollower(2, 2)
- r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
- if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) {
- t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
- }
- if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, tt.wunstable) {
- t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
- }
- }
- }
- // TestLeaderSyncFollowerLog tests that the leader could bring a follower's log
- // into consistency with its own.
- // Reference: section 5.3, figure 7
- func TestLeaderSyncFollowerLog(t *testing.T) {
- ents := []pb.Entry{
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4}, {Term: 4, Index: 5},
- {Term: 5, Index: 6}, {Term: 5, Index: 7},
- {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
- }
- term := uint64(8)
- tests := [][]pb.Entry{
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4}, {Term: 4, Index: 5},
- {Term: 5, Index: 6}, {Term: 5, Index: 7},
- {Term: 6, Index: 8}, {Term: 6, Index: 9},
- },
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4},
- },
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4}, {Term: 4, Index: 5},
- {Term: 5, Index: 6}, {Term: 5, Index: 7},
- {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11},
- },
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4}, {Term: 4, Index: 5},
- {Term: 5, Index: 6}, {Term: 5, Index: 7},
- {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
- {Term: 7, Index: 11}, {Term: 7, Index: 12},
- },
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7},
- },
- {
- {},
- {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
- {Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6},
- {Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11},
- },
- }
- for i, tt := range tests {
- leadStorage := NewMemoryStorage()
- leadStorage.Append(ents)
- lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
- lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
- followerStorage := NewMemoryStorage()
- followerStorage.Append(tt)
- follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
- follower.loadState(pb.HardState{Term: term - 1})
- // It is necessary to have a three-node cluster.
- // The second may have more up-to-date log than the first one, so the
- // first node needs the vote from the third node to become the leader.
- n := newNetwork(lead, follower, nopStepper)
- n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
- // The election occurs in the term after the one we loaded with
- // lead.loadState above.
- n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: term + 1})
- n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
- if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" {
- t.Errorf("#%d: log diff:\n%s", i, g)
- }
- }
- }
- // TestVoteRequest tests that the vote request includes information about the candidate’s log
- // and are sent to all of the other nodes.
- // Reference: section 5.4.1
- func TestVoteRequest(t *testing.T) {
- tests := []struct {
- ents []pb.Entry
- wterm uint64
- }{
- {[]pb.Entry{{Term: 1, Index: 1}}, 2},
- {[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
- }
- for j, tt := range tests {
- r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
- r.Step(pb.Message{
- From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
- })
- r.readMessages()
- for i := 1; i < r.electionTimeout*2; i++ {
- r.tickElection()
- }
- msgs := r.readMessages()
- sort.Sort(messageSlice(msgs))
- if len(msgs) != 2 {
- t.Fatalf("#%d: len(msg) = %d, want %d", j, len(msgs), 2)
- }
- for i, m := range msgs {
- if m.Type != pb.MsgVote {
- t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVote)
- }
- if m.To != uint64(i+2) {
- t.Errorf("#%d: to = %d, want %d", i, m.To, i+2)
- }
- if m.Term != tt.wterm {
- t.Errorf("#%d: term = %d, want %d", i, m.Term, tt.wterm)
- }
- windex, wlogterm := tt.ents[len(tt.ents)-1].Index, tt.ents[len(tt.ents)-1].Term
- if m.Index != windex {
- t.Errorf("#%d: index = %d, want %d", i, m.Index, windex)
- }
- if m.LogTerm != wlogterm {
- t.Errorf("#%d: logterm = %d, want %d", i, m.LogTerm, wlogterm)
- }
- }
- }
- }
- // TestVoter tests the voter denies its vote if its own log is more up-to-date
- // than that of the candidate.
- // Reference: section 5.4.1
- func TestVoter(t *testing.T) {
- tests := []struct {
- ents []pb.Entry
- logterm uint64
- index uint64
- wreject bool
- }{
- // same logterm
- {[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false},
- {[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false},
- {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
- // candidate higher logterm
- {[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false},
- {[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false},
- {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
- // voter higher logterm
- {[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true},
- {[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true},
- {[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
- }
- for i, tt := range tests {
- storage := NewMemoryStorage()
- storage.Append(tt.ents)
- r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
- r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
- msgs := r.readMessages()
- if len(msgs) != 1 {
- t.Fatalf("#%d: len(msg) = %d, want %d", i, len(msgs), 1)
- }
- m := msgs[0]
- if m.Type != pb.MsgVoteResp {
- t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVoteResp)
- }
- if m.Reject != tt.wreject {
- t.Errorf("#%d: reject = %t, want %t", i, m.Reject, tt.wreject)
- }
- }
- }
- // TestLeaderOnlyCommitsLogFromCurrentTerm tests that only log entries from the leader’s
- // current term are committed by counting replicas.
- // Reference: section 5.4.2
- func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
- ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
- tests := []struct {
- index uint64
- wcommit uint64
- }{
- // do not commit log entries in previous terms
- {1, 0},
- {2, 0},
- // commit log in current term
- {3, 3},
- }
- for i, tt := range tests {
- storage := NewMemoryStorage()
- storage.Append(ents)
- r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
- r.loadState(pb.HardState{Term: 2})
- // become leader at term 3
- r.becomeCandidate()
- r.becomeLeader()
- r.readMessages()
- // propose a entry to current term
- r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
- r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index})
- if r.raftLog.committed != tt.wcommit {
- t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit)
- }
- }
- }
- type messageSlice []pb.Message
- func (s messageSlice) Len() int { return len(s) }
- func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) }
- func (s messageSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
- func commitNoopEntry(r *raft, s *MemoryStorage) {
- if r.state != StateLeader {
- panic("it should only be used when it is the leader")
- }
- r.bcastAppend()
- // simulate the response of MsgApp
- msgs := r.readMessages()
- for _, m := range msgs {
- if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil {
- panic("not a message to append noop entry")
- }
- r.Step(acceptAndReply(m))
- }
- // ignore further messages to refresh followers' commit index
- r.readMessages()
- s.Append(r.raftLog.unstableEntries())
- r.raftLog.appliedTo(r.raftLog.committed)
- r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
- }
- func acceptAndReply(m pb.Message) pb.Message {
- if m.Type != pb.MsgApp {
- panic("type should be MsgApp")
- }
- return pb.Message{
- From: m.To,
- To: m.From,
- Term: m.Term,
- Type: pb.MsgAppResp,
- Index: m.Index + uint64(len(m.Entries)),
- }
- }
|