Browse Source

raft: add tests based on section 5.3 in raft paper

Yicheng Qin 11 years ago
parent
commit
421d5fbe72
2 changed files with 402 additions and 0 deletions
  1. 3 0
      raft/log.go
  2. 399 0
      raft/raft_paper_test.go

+ 3 - 0
raft/log.go

@@ -41,6 +41,9 @@ func newLog() *raftLog {
 }
 
 func (l *raftLog) load(ents []pb.Entry) {
+	if l.offset != ents[0].Index {
+		panic("entries loaded don't match offset index")
+	}
 	l.ents = ents
 	l.unstable = l.offset + uint64(len(ents))
 }

+ 399 - 0
raft/raft_paper_test.go

@@ -379,8 +379,407 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
 	}
 }
 
+// TestLeaderStartReplication tests that when receiving client proposals,
+// the leader appends the proposal to its log as a new entry, then issues
+// AppendEntries RPCs in parallel to each of the other servers to replicate
+// the entry. Also, when sending an AppendEntries RPC, the leader includes
+// the index and term of the entry in its log that immediately precedes
+// the new entries.
+// Also, it writes the new entry into stable storage.
+// Reference: section 5.3
+func TestLeaderStartReplication(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+	r.becomeCandidate()
+	r.becomeLeader()
+	commitNoopEntry(r)
+	li := r.raftLog.lastIndex()
+
+	ents := []pb.Entry{{Data: []byte("some data")}}
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents})
+
+	if g := r.raftLog.lastIndex(); g != li+1 {
+		t.Errorf("lastIndex = %d, want %d", g, li+1)
+	}
+	if g := r.raftLog.committed; g != li {
+		t.Errorf("committed = %d, want %d", g, li)
+	}
+	msgs := r.readMessages()
+	sort.Sort(messageSlice(msgs))
+	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
+	wmsgs := []pb.Message{
+		{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
+		{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
+	}
+	if !reflect.DeepEqual(msgs, wmsgs) {
+		t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
+	}
+	if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, wents) {
+		t.Errorf("ents = %+v, want %+v", g, wents)
+	}
+}
+
+// TestLeaderCommitEntry tests that when the entry has been safely replicated,
+// the leader gives out the applied entries, which can be applied to its state
+// machine.
+// Also, the leader keeps track of the highest index it knows to be committed,
+// and it includes that index in future AppendEntries RPCs so that the other
+// servers eventually find out.
+// Reference: section 5.3
+func TestLeaderCommitEntry(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+	r.becomeCandidate()
+	r.becomeLeader()
+	commitNoopEntry(r)
+	li := r.raftLog.lastIndex()
+	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
+
+	for _, m := range r.readMessages() {
+		r.Step(acceptAndReply(m))
+	}
+
+	if g := r.raftLog.committed; g != li+1 {
+		t.Errorf("committed = %d, want %d", g, li+1)
+	}
+	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
+	if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
+		t.Errorf("nextEnts = %+v, want %+v", g, wents)
+	}
+	msgs := r.readMessages()
+	sort.Sort(messageSlice(msgs))
+	for i, m := range msgs {
+		if w := uint64(i + 2); m.To != w {
+			t.Errorf("to = %x, want %x", m.To, w)
+		}
+		if m.Type != pb.MsgApp {
+			t.Errorf("type = %s, want %s", m.Type, pb.MsgApp)
+		}
+		if m.Commit != li+1 {
+			t.Errorf("commit = %d, want %d", m.Commit, li+1)
+		}
+	}
+}
+
+// TestLeaderAcknowledgeCommit tests that a log entry is committed once the
+// leader that created the entry has replicated it on a majority of the servers.
+// Reference: section 5.3
+func TestLeaderAcknowledgeCommit(t *testing.T) {
+	tests := []struct {
+		size      int
+		acceptors map[uint64]bool
+		wack      bool
+	}{
+		{1, nil, true},
+		{3, nil, false},
+		{3, map[uint64]bool{2: true}, true},
+		{3, map[uint64]bool{2: true, 3: true}, true},
+		{5, nil, false},
+		{5, map[uint64]bool{2: true}, false},
+		{5, map[uint64]bool{2: true, 3: true}, true},
+		{5, map[uint64]bool{2: true, 3: true, 4: true}, true},
+		{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, idsBySize(tt.size), 10, 1)
+		r.becomeCandidate()
+		r.becomeLeader()
+		commitNoopEntry(r)
+		li := r.raftLog.lastIndex()
+		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
+
+		for _, m := range r.readMessages() {
+			if tt.acceptors[m.To] {
+				r.Step(acceptAndReply(m))
+			}
+		}
+
+		if g := r.raftLog.committed > li; g != tt.wack {
+			t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack)
+		}
+	}
+}
+
+// TestLeaderCommitPrecedingEntries tests that when leader commits a log entry,
+// it also commits all preceding entries in the leader’s log, including
+// entries created by previous leaders.
+// Also, it applies the entry to its local state machine (in log order).
+// Reference: section 5.3
+func TestLeaderCommitPrecedingEntries(t *testing.T) {
+	tests := [][]pb.Entry{
+		{},
+		{{Term: 2, Index: 1}},
+		{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
+		{{Term: 1, Index: 1}},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+		r.loadEnts(append([]pb.Entry{{}}, tt...))
+		r.loadState(pb.HardState{Term: 2})
+		r.becomeCandidate()
+		r.becomeLeader()
+		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
+
+		for _, m := range r.readMessages() {
+			r.Step(acceptAndReply(m))
+		}
+
+		li := uint64(len(tt))
+		wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
+		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
+		}
+	}
+}
+
+// TestFollowerCommitEntry tests that once a follower learns that a log entry
+// is committed, it applies the entry to its local state machine (in log order).
+// Reference: section 5.3
+func TestFollowerCommitEntry(t *testing.T) {
+	tests := []struct {
+		ents   []pb.Entry
+		commit uint64
+	}{
+		{
+			[]pb.Entry{
+				{Term: 1, Index: 1, Data: []byte("some data")},
+			},
+			1,
+		},
+		{
+			[]pb.Entry{
+				{Term: 1, Index: 1, Data: []byte("some data")},
+				{Term: 1, Index: 2, Data: []byte("some data2")},
+			},
+			2,
+		},
+		{
+			[]pb.Entry{
+				{Term: 1, Index: 1, Data: []byte("some data2")},
+				{Term: 1, Index: 2, Data: []byte("some data")},
+			},
+			2,
+		},
+		{
+			[]pb.Entry{
+				{Term: 1, Index: 1, Data: []byte("some data")},
+				{Term: 1, Index: 2, Data: []byte("some data2")},
+			},
+			1,
+		},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+		r.becomeFollower(1, 2)
+
+		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
+
+		if g := r.raftLog.committed; g != tt.commit {
+			t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
+		}
+		wents := tt.ents[:int(tt.commit)]
+		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
+			t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
+		}
+	}
+}
+
+// TestFollowerCheckMsgApp tests that if the follower does not find an
+// entry in its log with the same index and term as the one in AppendEntries RPC,
+// then it refuses the new entries. Otherwise it replies that it accepts the
+// append entries.
+// Reference: section 5.3
+func TestFollowerCheckMsgApp(t *testing.T) {
+	ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}
+	tests := []struct {
+		term    uint64
+		index   uint64
+		wreject bool
+	}{
+		{ents[0].Term, ents[0].Index, false},
+		{ents[1].Term, ents[1].Index, false},
+		{ents[2].Term, ents[2].Index, false},
+		{ents[1].Term, ents[1].Index + 1, true},
+		{ents[1].Term + 1, ents[1].Index, true},
+		{3, 3, true},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+		r.loadEnts(ents)
+		r.loadState(pb.HardState{Commit: 2})
+		r.becomeFollower(2, 2)
+
+		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
+
+		msgs := r.readMessages()
+		wmsgs := []pb.Message{
+			{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.index, Reject: tt.wreject},
+		}
+		if !reflect.DeepEqual(msgs, wmsgs) {
+			t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
+		}
+	}
+}
+
+// TestFollowerAppendEntries tests that when AppendEntries RPC is valid,
+// the follower will delete the existing conflict entry and all that follow it,
+// and append any new entries not already in the log.
+// Also, it writes the new entry into stable storage.
+// Reference: section 5.3
+func TestFollowerAppendEntries(t *testing.T) {
+	tests := []struct {
+		index, term uint64
+		ents        []pb.Entry
+		wents       []pb.Entry
+		wunstable   []pb.Entry
+	}{
+		{
+			2, 2,
+			[]pb.Entry{{Term: 3, Index: 3}},
+			[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
+			[]pb.Entry{{Term: 3, Index: 3}},
+		},
+		{
+			1, 1,
+			[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
+			[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}},
+			[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
+		},
+		{
+			0, 0,
+			[]pb.Entry{{Term: 1, Index: 1}},
+			[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}},
+			nil,
+		},
+		{
+			0, 0,
+			[]pb.Entry{{Term: 3, Index: 3}},
+			[]pb.Entry{{}, {Term: 3, Index: 3}},
+			[]pb.Entry{{Term: 3, Index: 3}},
+		},
+	}
+	for i, tt := range tests {
+		r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+		r.loadEnts([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}})
+		r.becomeFollower(2, 2)
+
+		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
+
+		if g := r.raftLog.ents; !reflect.DeepEqual(g, tt.wents) {
+			t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
+		}
+		if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, tt.wunstable) {
+			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
+		}
+	}
+}
+
+// TestLeaderSyncFollowerLog tests that the leader could bring a follower's log
+// into consistency with its own.
+// Reference: section 5.3, figure 7
+func TestLeaderSyncFollowerLog(t *testing.T) {
+	ents := []pb.Entry{
+		{},
+		{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+		{Term: 4, Index: 4}, {Term: 4, Index: 5},
+		{Term: 5, Index: 6}, {Term: 5, Index: 7},
+		{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
+	}
+	term := uint64(8)
+	tests := [][]pb.Entry{
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 4, Index: 4}, {Term: 4, Index: 5},
+			{Term: 5, Index: 6}, {Term: 5, Index: 7},
+			{Term: 6, Index: 8}, {Term: 6, Index: 9},
+		},
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 4, Index: 4},
+		},
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 4, Index: 4}, {Term: 4, Index: 5},
+			{Term: 5, Index: 6}, {Term: 5, Index: 7},
+			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11},
+		},
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 4, Index: 4}, {Term: 4, Index: 5},
+			{Term: 5, Index: 6}, {Term: 5, Index: 7},
+			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
+			{Term: 7, Index: 11}, {Term: 7, Index: 12},
+		},
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7},
+		},
+		{
+			{},
+			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
+			{Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6},
+			{Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11},
+		},
+	}
+	for i, tt := range tests {
+		lead := newRaft(1, []uint64{1, 2, 3}, 10, 1)
+		lead.loadEnts(ents)
+		lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
+		follower := newRaft(2, []uint64{1, 2, 3}, 10, 1)
+		follower.loadEnts(tt)
+		follower.loadState(pb.HardState{Term: term - 1})
+		// It is necessary to have a three-node cluster.
+		// The second may have more up-to-date log than the first one, so the
+		// first node needs the vote from the third node to become the leader.
+		n := newNetwork(lead, follower, nopStepper)
+		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
+		n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: 1})
+
+		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
+		if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" {
+			t.Errorf("#%d: log diff:\n%s", i, g)
+		}
+	}
+}
+
 type messageSlice []pb.Message
 
 func (s messageSlice) Len() int           { return len(s) }
 func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) }
 func (s messageSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+
+func commitNoopEntry(r *raft) {
+	if r.state != StateLeader {
+		panic("it should only be used when it is the leader")
+	}
+	r.bcastAppend()
+	// simulate the response of MsgApp
+	msgs := r.readMessages()
+	for _, m := range msgs {
+		if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil {
+			panic("not a message to append noop entry")
+		}
+		r.Step(acceptAndReply(m))
+	}
+	// ignore further messages to refresh followers' commmit index
+	r.readMessages()
+	r.raftLog.resetNextEnts()
+	r.raftLog.resetUnstable()
+}
+
+func acceptAndReply(m pb.Message) pb.Message {
+	if m.Type != pb.MsgApp {
+		panic("type should be MsgApp")
+	}
+	return pb.Message{
+		From:  m.To,
+		To:    m.From,
+		Term:  m.Term,
+		Type:  pb.MsgAppResp,
+		Index: m.Index + uint64(len(m.Entries)),
+	}
+}