Browse Source

raft: fix log append; add tests

Xiang Li 11 years ago
parent
commit
447d7dc51b
4 changed files with 140 additions and 3 deletions
  1. 28 2
      raft/log.go
  2. 54 0
      raft/log_test.go
  3. 2 1
      raft/node_test.go
  4. 56 0
      raft/raft_test.go

+ 28 - 2
raft/log.go

@@ -45,8 +45,18 @@ func newLog() *log {
 
 func (l *log) maybeAppend(index, logTerm, committed int64, ents ...Entry) bool {
 	if l.matchTerm(index, logTerm) {
-		l.append(index, ents...)
-		l.committed = committed
+		from := index + 1
+		ci := l.findConflict(from, ents)
+		switch {
+		case ci == -1:
+		case ci <= l.committed:
+			panic("conflict with committed entry")
+		default:
+			l.append(ci-1, ents[ci-from:]...)
+		}
+		if l.committed < committed {
+			l.committed = min(committed, l.lastIndex())
+		}
 		return true
 	}
 	return false
@@ -57,6 +67,15 @@ func (l *log) append(after int64, ents ...Entry) int64 {
 	return l.lastIndex()
 }
 
+func (l *log) findConflict(from int64, ents []Entry) int64 {
+	for i, ne := range ents {
+		if oe := l.at(from + int64(i)); oe == nil || oe.Term != ne.Term {
+			return from + int64(i)
+		}
+	}
+	return -1
+}
+
 func (l *log) lastIndex() int64 {
 	return int64(len(l.ents)) - 1 + l.offset
 }
@@ -156,3 +175,10 @@ func (l *log) isOutOfBounds(i int64) bool {
 	}
 	return false
 }
+
+func min(a, b int64) int64 {
+	if a > b {
+		return b
+	}
+	return a
+}

+ 54 - 0
raft/log_test.go

@@ -5,6 +5,60 @@ import (
 	"testing"
 )
 
+// TestAppend ensures:
+// 1. If an existing entry conflicts with a new one (same index
+// but different terms), delete the existing entry and all that
+// follow it
+// 2.Append any new entries not already in the log
+func TestAppend(t *testing.T) {
+	previousEnts := []Entry{{Term: 1}, {Term: 2}}
+	tests := []struct {
+		after  int64
+		ents   []Entry
+		windex int64
+		wents  []Entry
+	}{
+		{
+			2,
+			[]Entry{},
+			2,
+			[]Entry{{Term: 1}, {Term: 2}},
+		},
+		{
+			2,
+			[]Entry{{Term: 2}},
+			3,
+			[]Entry{{Term: 1}, {Term: 2}, {Term: 2}},
+		},
+		// conflicts with index 1
+		{
+			0,
+			[]Entry{{Term: 2}},
+			1,
+			[]Entry{{Term: 2}},
+		},
+		// conflicts with index 2
+		{
+			1,
+			[]Entry{{Term: 3}, {Term: 3}},
+			3,
+			[]Entry{{Term: 1}, {Term: 3}, {Term: 3}},
+		},
+	}
+
+	for i, tt := range tests {
+		log := newLog()
+		log.ents = append(log.ents, previousEnts...)
+		index := log.append(tt.after, tt.ents...)
+		if index != tt.windex {
+			t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
+		}
+		if g := log.entries(1); !reflect.DeepEqual(g, tt.wents) {
+			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
+		}
+	}
+}
+
 // TestCompactionSideEffects ensures that all the log related funcationality works correctly after
 // a compaction.
 func TestCompactionSideEffects(t *testing.T) {

+ 2 - 1
raft/node_test.go

@@ -72,13 +72,14 @@ func TestResetElapse(t *testing.T) {
 	}{
 		{Message{From: 0, To: 1, Type: msgApp, Term: 2, Entries: []Entry{{Term: 1}}}, 0},
 		{Message{From: 0, To: 1, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}, 1},
-		{Message{From: 0, To: 1, Type: msgVote, Term: 2}, 0},
+		{Message{From: 0, To: 1, Type: msgVote, Term: 2, Index: 1, LogTerm: 1}, 0},
 		{Message{From: 0, To: 1, Type: msgVote, Term: 1}, 1},
 	}
 
 	for i, tt := range tests {
 		n := New(0, defaultHeartbeat, defaultElection)
 		n.sm = newStateMachine(0, []int64{0, 1, 2})
+		n.sm.log.append(0, Entry{Type: Normal, Term: 1})
 		n.sm.term = 2
 		n.sm.log.committed = 1
 

+ 56 - 0
raft/raft_test.go

@@ -445,6 +445,62 @@ func TestCommit(t *testing.T) {
 	}
 }
 
+// TestHandleMsgApp ensures:
+// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
+// 2. If an existing entry conflicts with a new one (same index but different terms),
+//    delete the existing entry and all that follow it; append any new entries not already in the log.
+// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
+func TestHandleMsgApp(t *testing.T) {
+	tests := []struct {
+		m       Message
+		wIndex  int64
+		wCommit int64
+		wAccept bool
+	}{
+		// Ensure 1
+		{Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, false}, // previous log mismatch
+		{Message{Type: msgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, false}, // previous log non-exist
+
+		// Ensure 2
+		{Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, true},
+		{Message{Type: msgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []Entry{{Term: 2}}}, 1, 1, true},
+		{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []Entry{{Term: 2}, {Term: 2}}}, 4, 3, true},
+		{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []Entry{{Term: 2}}}, 3, 3, true},
+		{Message{Type: msgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []Entry{{Term: 2}}}, 2, 2, true},
+
+		// Ensure 3
+		{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 2}, 2, 2, true},
+		{Message{Type: msgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, true}, // commit upto min(commit, last)
+	}
+
+	for i, tt := range tests {
+		sm := &stateMachine{
+			state: stateFollower,
+			term:  2,
+			log:   &log{committed: 0, ents: []Entry{{}, {Term: 1}, {Term: 2}}},
+		}
+
+		sm.handleAppendEntries(tt.m)
+		if sm.log.lastIndex() != tt.wIndex {
+			t.Errorf("#%d: lastIndex = %d, want %d", i, sm.log.lastIndex(), tt.wIndex)
+		}
+		if sm.log.committed != tt.wCommit {
+			t.Errorf("#%d: committed = %d, want %d", i, sm.log.committed, tt.wCommit)
+		}
+		m := sm.Msgs()
+		if len(m) != 1 {
+			t.Errorf("#%d: msg = nil, want 1")
+		}
+		gaccept := true
+		if m[0].Index == -1 {
+			gaccept = false
+		}
+		if gaccept != tt.wAccept {
+			t.Errorf("#%d: accept = %v, want %v", gaccept, tt.wAccept)
+		}
+	}
+}
+
 func TestRecvMsgVote(t *testing.T) {
 	tests := []struct {
 		state   stateType