Browse Source

raft: leader commit and test

Signed-off-by: Blake Mizerany <blake.mizerany@gmail.com>
Xiang Li 11 years ago
parent
commit
74737b76cc
2 changed files with 84 additions and 18 deletions
  1. 17 9
      raft/raft.go
  2. 67 9
      raft/raft_test.go

+ 17 - 9
raft/raft.go

@@ -104,7 +104,8 @@ type stateMachine struct {
 
 	state stateType
 
-	commit int
+	commit  int
+	applied int
 
 	votes map[int]bool
 
@@ -180,11 +181,12 @@ func (sm *stateMachine) sendAppend() {
 		m.Index = in.next - 1
 		m.LogTerm = sm.log[in.next-1].Term
 		m.Entries = sm.log[in.next:]
+		m.Commit = sm.commit
 		sm.send(m)
 	}
 }
 
-func (sm *stateMachine) theN() int {
+func (sm *stateMachine) maybeCommit() bool {
 	// TODO(bmizerany): optimize.. Currently naive
 	mis := make([]int, len(sm.ins))
 	for i := range mis {
@@ -192,18 +194,20 @@ func (sm *stateMachine) theN() int {
 	}
 	sort.Sort(sort.Reverse(sort.IntSlice(mis)))
 	mci := mis[sm.q()-1]
-	if sm.log[mci].Term == sm.term {
-		return mci
+
+	if mci > sm.commit && sm.log[mci].Term == sm.term {
+		sm.commit = mci
+		return true
 	}
 
-	return -1
+	return false
 }
 
+// nextEnts returns the appliable entries and updates the applied index
 func (sm *stateMachine) nextEnts() (ents []Entry) {
-	ci := sm.theN()
-	if ci > sm.commit {
-		ents = sm.log[sm.commit+1 : ci]
-		sm.commit = ci
+	if sm.commit > sm.applied {
+		ents = sm.log[sm.applied+1 : sm.commit+1]
+		sm.applied = sm.commit
 	}
 	return ents
 }
@@ -306,6 +310,7 @@ func (sm *stateMachine) Step(m Message) {
 
 	handleAppendEntries := func() {
 		if sm.isLogOk(m.Index, m.LogTerm) {
+			sm.commit = m.Commit
 			sm.append(m.Index, m.Entries...)
 			sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.li()})
 		} else {
@@ -323,6 +328,9 @@ func (sm *stateMachine) Step(m Message) {
 				sm.sendAppend()
 			} else {
 				in.update(m.Index)
+				if sm.maybeCommit() {
+					sm.sendAppend()
+				}
 			}
 		}
 	case stateCandidate:

+ 67 - 9
raft/raft_test.go

@@ -1,6 +1,7 @@
 package raft
 
 import (
+	"bytes"
 	"fmt"
 	"reflect"
 	"testing"
@@ -56,6 +57,63 @@ func TestLeaderElection(t *testing.T) {
 	}
 }
 
+func TestLogReplication(t *testing.T) {
+	tests := []struct {
+		*network
+		msgs    []Message
+		wcommit int
+	}{
+		{
+			newNetwork(nil, nil, nil),
+			[]Message{
+				Message{To: 0, Type: msgProp, Data: []byte("somedata")},
+			},
+			1,
+		},
+		{
+			newNetwork(nil, nil, nil),
+			[]Message{
+				Message{To: 0, Type: msgProp, Data: []byte("somedata")},
+				Message{To: 1, Type: msgHup},
+				Message{To: 1, Type: msgProp, Data: []byte("somedata")},
+			},
+			2,
+		},
+	}
+
+	for i, tt := range tests {
+		tt.tee = stepperFunc(func(m Message) {
+			t.Logf("#%d: m = %+v", i, m)
+		})
+		tt.Step(Message{To: 0, Type: msgHup})
+
+		for _, m := range tt.msgs {
+			tt.Step(m)
+		}
+
+		for j, ism := range tt.ss {
+			sm := ism.(*nsm)
+
+			if sm.commit != tt.wcommit {
+				t.Errorf("#%d.%d: commit = %d, want %d", i, j, sm.commit, tt.wcommit)
+			}
+
+			ents := sm.nextEnts()
+			props := make([]Message, 0)
+			for _, m := range tt.msgs {
+				if m.Type == msgProp {
+					props = append(props, m)
+				}
+			}
+			for k, m := range props {
+				if !bytes.Equal(ents[k].Data, m.Data) {
+					t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Data)
+				}
+			}
+		}
+	}
+}
+
 func TestDualingCandidates(t *testing.T) {
 	a := &nsm{stateMachine{log: defaultLog}, nil}
 	c := &nsm{stateMachine{log: defaultLog}, nil}
@@ -242,7 +300,7 @@ func TestProposalByProxy(t *testing.T) {
 	}
 }
 
-func TestTheN(t *testing.T) {
+func TestCommit(t *testing.T) {
 	tests := []struct {
 		matches []int
 		logs    []Entry
@@ -251,17 +309,17 @@ func TestTheN(t *testing.T) {
 	}{
 		// odd
 		{[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
-		{[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1},
+		{[]int{2, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
 		{[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
-		{[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1},
+		{[]int{2, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
 
 		// even
 		{[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
-		{[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1},
+		{[]int{2, 1, 1, 1}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
 		{[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 1, 1},
-		{[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1},
+		{[]int{2, 1, 1, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
 		{[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 2}}, 2, 2},
-		{[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, -1},
+		{[]int{2, 1, 2, 2}, []Entry{{}, {Term: 1}, {Term: 1}}, 2, 0},
 	}
 
 	for i, tt := range tests {
@@ -270,9 +328,9 @@ func TestTheN(t *testing.T) {
 			ins[j] = &index{tt.matches[j], tt.matches[j] + 1}
 		}
 		sm := &stateMachine{log: tt.logs, ins: ins, k: len(ins), term: tt.smTerm}
-		g := sm.theN()
-		if g != tt.w {
-			t.Errorf("#%d: theN = %d, want %d", i, g, tt.w)
+		sm.maybeCommit()
+		if g := sm.commit; g != tt.w {
+			t.Errorf("#%d: commit = %d, want %d", i, g, tt.w)
 		}
 	}
 }