Browse Source

raft: send Normal with nil Data when leader is elected out

Yicheng Qin 11 years ago
parent
commit
62a90e77b3
3 changed files with 139 additions and 57 deletions
  1. 2 2
      raft/node_test.go
  2. 10 5
      raft/raft.go
  3. 127 50
      raft/raft_test.go

+ 2 - 2
raft/node_test.go

@@ -39,7 +39,7 @@ func TestTickMsgBeat(t *testing.T) {
 		n.Add(i, "")
 		for _, m := range n.Msgs() {
 			if m.Type == msgApp {
-				n.Step(Message{From: m.To, Type: msgAppResp, Index: i + 1})
+				n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)})
 			}
 		}
 		// ignore commit index update messages
@@ -129,7 +129,7 @@ func TestRemove(t *testing.T) {
 	n.Add(1, "")
 	n.Next()
 	n.Remove(0)
-	n.Step(Message{Type: msgAppResp, From: 1, Term: 1, Index: 3})
+	n.Step(Message{Type: msgAppResp, From: 1, Term: 1, Index: 4})
 	n.Next()
 
 	if len(n.sm.ins) != 1 {

+ 10 - 5
raft/raft.go

@@ -198,6 +198,13 @@ func (sm *stateMachine) q() int {
 	return len(sm.ins)/2 + 1
 }
 
+func (sm *stateMachine) appendEntry(e Entry) {
+	e.Term = sm.term
+	sm.log.append(sm.log.lastIndex(), e)
+	sm.ins[sm.id].update(sm.log.lastIndex())
+	sm.maybeCommit()
+}
+
 // promotable indicates whether state machine could be promoted.
 // New machine has to wait for the first log entry to be committed, or it will
 // always start as a one-node cluster.
@@ -236,6 +243,8 @@ func (sm *stateMachine) becomeLeader() {
 			sm.pendingConf = true
 		}
 	}
+
+	sm.appendEntry(Entry{Type: Normal, Data: nil})
 }
 
 func (sm *stateMachine) Msgs() []Message {
@@ -310,11 +319,7 @@ func stepLeader(sm *stateMachine, m Message) bool {
 			}
 			sm.pendingConf = true
 		}
-		e.Term = sm.term
-
-		sm.log.append(sm.log.lastIndex(), e)
-		sm.ins[sm.id].update(sm.log.lastIndex())
-		sm.maybeCommit()
+		sm.appendEntry(e)
 		sm.bcastAppend()
 	case msgAppResp:
 		if m.Index < 0 {

+ 127 - 50
raft/raft_test.go

@@ -47,7 +47,7 @@ func TestLogReplication(t *testing.T) {
 			[]Message{
 				{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
 			},
-			1,
+			2,
 		},
 		{
 			newNetwork(nil, nil, nil),
@@ -57,7 +57,7 @@ func TestLogReplication(t *testing.T) {
 				{To: 1, Type: msgHup},
 				{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
 			},
-			2,
+			4,
 		},
 	}
 
@@ -75,7 +75,12 @@ func TestLogReplication(t *testing.T) {
 				t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.log.committed, tt.wcommitted)
 			}
 
-			ents := sm.nextEnts()
+			ents := make([]Entry, 0)
+			for _, e := range sm.nextEnts() {
+				if e.Data != nil {
+					ents = append(ents, e)
+				}
+			}
 			props := make([]Message, 0)
 			for _, m := range tt.msgs {
 				if m.Type == msgProp {
@@ -98,11 +103,14 @@ func TestSingleNodeCommit(t *testing.T) {
 	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	sm := tt.peers[0].(*stateMachine)
-	if sm.log.committed != 2 {
-		t.Errorf("committed = %d, want %d", sm.log.committed, 2)
+	if sm.log.committed != 3 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 3)
 	}
 }
 
+// TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
+// when leader changes, no new proposal comes in and ChangeTerm proposal is
+// filtered.
 func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt := newNetwork(nil, nil, nil, nil, nil)
 	tt.send(Message{To: 0, Type: msgHup})
@@ -116,52 +124,99 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	sm := tt.peers[0].(*stateMachine)
-	if sm.log.committed != 0 {
-		t.Errorf("committed = %d, want %d", sm.log.committed, 0)
+	if sm.log.committed != 1 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 1)
 	}
 
 	// network recovery
 	tt.recover()
+	// avoid committing ChangeTerm proposal
+	tt.ignore(msgApp)
 
 	// elect 1 as the new leader with term 2
 	tt.send(Message{To: 1, Type: msgHup})
-	// send out a heartbeat
-	tt.send(Message{To: 1, Type: msgBeat})
 
 	// no log entries from previous term should be committed
 	sm = tt.peers[1].(*stateMachine)
-	if sm.log.committed != 0 {
-		t.Errorf("committed = %d, want %d", sm.log.committed, 0)
+	if sm.log.committed != 1 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 1)
 	}
 
-	// after append a entry from the current term, all entries
+	tt.recover()
+
+	// send out a heartbeat
+	// after append a ChangeTerm entry from the current term, all entries
 	// should be committed
+	tt.send(Message{To: 1, Type: msgBeat})
+
+	if sm.log.committed != 4 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 4)
+	}
+
+	// still be able to append a entry
 	tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
-	if sm.log.committed != 3 {
-		t.Errorf("committed = %d, want %d", sm.log.committed, 3)
+
+	if sm.log.committed != 5 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 5)
+	}
+}
+
+// TestCommitWithoutNewTermEntry tests the entries could be committed
+// when leader changes, no new proposal comes in.
+func TestCommitWithoutNewTermEntry(t *testing.T) {
+	tt := newNetwork(nil, nil, nil, nil, nil)
+	tt.send(Message{To: 0, Type: msgHup})
+
+	// 0 cannot reach 2,3,4
+	tt.cut(0, 2)
+	tt.cut(0, 3)
+	tt.cut(0, 4)
+
+	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+
+	sm := tt.peers[0].(*stateMachine)
+	if sm.log.committed != 1 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 1)
+	}
+
+	// network recovery
+	tt.recover()
+
+	// elect 1 as the new leader with term 2
+	// after append a ChangeTerm entry from the current term, all entries
+	// should be committed
+	tt.send(Message{To: 1, Type: msgHup})
+
+	if sm.log.committed != 4 {
+		t.Errorf("committed = %d, want %d", sm.log.committed, 4)
 	}
 }
 
 func TestDuelingCandidates(t *testing.T) {
 	a := newStateMachine(0, nil) // k, id are set later
+	b := newStateMachine(0, nil)
 	c := newStateMachine(0, nil)
 
-	tt := newNetwork(a, nil, c)
-	tt.cut(0, 2)
+	nt := newNetwork(a, b, c)
+	nt.cut(0, 2)
 
-	tt.send(Message{To: 0, Type: msgHup})
-	tt.send(Message{To: 2, Type: msgHup})
+	nt.send(Message{To: 0, Type: msgHup})
+	nt.send(Message{To: 2, Type: msgHup})
 
-	tt.recover()
-	tt.send(Message{To: 2, Type: msgHup})
+	nt.recover()
+	nt.send(Message{To: 2, Type: msgHup})
 
+	wlog := &log{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1}}, committed: 1}
 	tests := []struct {
 		sm    *stateMachine
 		state stateType
 		term  int
+		log   *log
 	}{
-		{a, stateFollower, 2},
-		{c, stateLeader, 2},
+		{a, stateFollower, 2, wlog},
+		{b, stateFollower, 2, wlog},
+		{c, stateFollower, 2, newLog()},
 	}
 
 	for i, tt := range tests {
@@ -171,11 +226,8 @@ func TestDuelingCandidates(t *testing.T) {
 		if g := tt.sm.term; g != tt.term {
 			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
 		}
-	}
-
-	base := ltoa(newLog())
-	for i, p := range tt.peers {
-		if sm, ok := p.(*stateMachine); ok {
+		base := ltoa(tt.log)
+		if sm, ok := nt.peers[i].(*stateMachine); ok {
 			l := ltoa(sm.log)
 			if g := diffu(base, l); g != "" {
 				t.Errorf("#%d: diff:\n%s", i, g)
@@ -207,7 +259,7 @@ func TestCandidateConcede(t *testing.T) {
 	if g := a.term; g != 1 {
 		t.Errorf("term = %d, want %d", g, 1)
 	}
-	wantLog := ltoa(&log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1})
+	wantLog := ltoa(&log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2})
 	for i, p := range tt.peers {
 		if sm, ok := p.(*stateMachine); ok {
 			l := ltoa(sm.log)
@@ -239,7 +291,14 @@ func TestOldMessages(t *testing.T) {
 	// pretend we're an old leader trying to make progress
 	tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
 
-	base := ltoa(newLog())
+	l := &log{
+		ents: []Entry{
+			{}, {Type: Normal, Data: nil, Term: 1},
+			{Type: Normal, Data: nil, Term: 2}, {Type: Normal, Data: nil, Term: 3},
+		},
+		committed: 3,
+	}
+	base := ltoa(l)
 	for i, p := range tt.peers {
 		if sm, ok := p.(*stateMachine); ok {
 			l := ltoa(sm.log)
@@ -289,7 +348,7 @@ func TestProposal(t *testing.T) {
 
 		wantLog := newLog()
 		if tt.success {
-			wantLog = &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
+			wantLog = &log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}
 		}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
@@ -323,7 +382,7 @@ func TestProposalByProxy(t *testing.T) {
 		// propose via follower
 		tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}})
 
-		wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}
+		wantLog := &log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
 			if sm, ok := p.(*stateMachine); ok {
@@ -496,19 +555,19 @@ func TestConf(t *testing.T) {
 	sm.becomeLeader()
 
 	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}})
-	if sm.log.lastIndex() != 1 {
+	if sm.log.lastIndex() != 2 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}
 	if !sm.pendingConf {
 		t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true)
 	}
-	if sm.log.ents[1].Type != AddNode {
+	if sm.log.ents[2].Type != AddNode {
 		t.Errorf("type = %d, want %d", sm.log.ents[1].Type, AddNode)
 	}
 
 	// deny the second configuration change request if there is a pending one
 	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}})
-	if sm.log.lastIndex() != 1 {
+	if sm.log.lastIndex() != 2 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}
 }
@@ -539,20 +598,24 @@ func TestConfChangeLeader(t *testing.T) {
 }
 
 func TestAllServerStepdown(t *testing.T) {
-	tests := []stateType{stateFollower, stateCandidate, stateLeader}
-
-	want := struct {
+	tests := []struct {
 		state stateType
-		term  int
-		index int
-	}{stateFollower, 3, 1}
+
+		wstate stateType
+		wterm  int
+		windex int
+	}{
+		{stateFollower, stateFollower, 3, 1},
+		{stateCandidate, stateFollower, 3, 1},
+		{stateLeader, stateFollower, 3, 2},
+	}
 
 	tmsgTypes := [...]messageType{msgVote, msgApp}
 	tterm := 3
 
 	for i, tt := range tests {
 		sm := newStateMachine(0, []int{0, 1, 2})
-		switch tt {
+		switch tt.state {
 		case stateFollower:
 			sm.becomeFollower(1, 0)
 		case stateCandidate:
@@ -565,14 +628,14 @@ func TestAllServerStepdown(t *testing.T) {
 		for j, msgType := range tmsgTypes {
 			sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm})
 
-			if sm.state != want.state {
-				t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state)
+			if sm.state != tt.wstate {
+				t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
 			}
-			if sm.term != want.term {
-				t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term)
+			if sm.term != tt.wterm {
+				t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, tt.wterm)
 			}
-			if len(sm.log.ents) != want.index {
-				t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index)
+			if len(sm.log.ents) != tt.windex {
+				t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), tt.windex)
 			}
 		}
 	}
@@ -596,6 +659,7 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.becomeCandidate()
 		sm.becomeLeader()
+		sm.Msgs()
 		sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
 		msgs := sm.Msgs()
 
@@ -656,8 +720,9 @@ func ents(terms ...int) *stateMachine {
 }
 
 type network struct {
-	peers map[int]Interface
-	dropm map[connem]float64
+	peers   map[int]Interface
+	dropm   map[connem]float64
+	ignorem map[messageType]bool
 }
 
 // newNetwork initializes a network from peers.
@@ -692,7 +757,11 @@ func newNetwork(peers ...Interface) *network {
 			npeers[id] = v
 		}
 	}
-	return &network{peers: npeers, dropm: make(map[connem]float64)}
+	return &network{
+		peers:   npeers,
+		dropm:   make(map[connem]float64),
+		ignorem: make(map[messageType]bool),
+	}
 }
 
 func (nw *network) send(msgs ...Message) {
@@ -722,13 +791,21 @@ func (nw *network) isolate(id int) {
 	}
 }
 
+func (nw *network) ignore(t messageType) {
+	nw.ignorem[t] = true
+}
+
 func (nw *network) recover() {
 	nw.dropm = make(map[connem]float64)
+	nw.ignorem = make(map[messageType]bool)
 }
 
 func (nw *network) filter(msgs []Message) []Message {
 	mm := make([]Message, 0)
 	for _, m := range msgs {
+		if nw.ignorem[m.Type] {
+			continue
+		}
 		switch m.Type {
 		case msgHup:
 			// hups never go over the network, so don't drop them but panic