Browse Source

raft: use From field to distinguish local message

Yicheng Qin 11 years ago
parent
commit
79689872af
4 changed files with 56 additions and 59 deletions
  1. 1 1
      raft/cluster_test.go
  2. 8 11
      raft/node.go
  3. 4 4
      raft/node_test.go
  4. 43 43
      raft/raft_test.go

+ 1 - 1
raft/cluster_test.go

@@ -83,7 +83,7 @@ func TestBasicCluster(t *testing.T) {
 		for j := 0; j < tt.round; j++ {
 			for _, n := range nodes {
 				data := []byte{byte(n.Id())}
-				nt.send(Message{Type: msgProp, To: n.Id(), Entries: []Entry{{Data: data}}})
+				nt.send(Message{From: n.Id(), To: n.Id(), Type: msgProp, Entries: []Entry{{Data: data}}})
 
 				base := nodes[0].Next()
 				if len(base) != 1 {

+ 8 - 11
raft/node.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	golog "log"
 	"math/rand"
-	"sync/atomic"
 	"time"
 )
 
@@ -51,9 +50,7 @@ func New(id int64, heartbeat, election tick) *Node {
 	return n
 }
 
-func (n *Node) Id() int64 {
-	return atomic.LoadInt64(&n.sm.id)
-}
+func (n *Node) Id() int64 { return n.sm.id }
 
 func (n *Node) Index() int64 { return n.sm.index.Get() }
 
@@ -73,10 +70,10 @@ func (n *Node) IsRemoved() bool { return n.removed }
 func (n *Node) Propose(data []byte) { n.propose(Normal, data) }
 
 func (n *Node) propose(t int64, data []byte) {
-	n.Step(Message{Type: msgProp, Entries: []Entry{{Type: t, Data: data}}})
+	n.Step(Message{From: n.sm.id, Type: msgProp, Entries: []Entry{{Type: t, Data: data}}})
 }
 
-func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) }
+func (n *Node) Campaign() { n.Step(Message{From: n.sm.id, Type: msgHup}) }
 
 func (n *Node) Add(id int64, addr string, context []byte) {
 	n.UpdateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context})
@@ -91,11 +88,11 @@ func (n *Node) Step(m Message) bool {
 		n.removed = true
 		return false
 	}
-	if m.Term != 0 {
-		if _, ok := n.rmNodes[m.From]; ok {
-			n.sm.send(Message{To: m.From, Type: msgDenied})
-			return true
+	if _, ok := n.rmNodes[m.From]; ok {
+		if m.From != n.sm.id {
+			n.sm.send(Message{From: n.sm.id, To: m.From, Type: msgDenied})
 		}
+		return true
 	}
 
 	l := len(n.sm.msgs)
@@ -162,7 +159,7 @@ func (n *Node) Tick() {
 		timeout, msgType = n.heartbeat, msgBeat
 	}
 	if n.elapsed >= timeout {
-		n.Step(Message{Type: msgType})
+		n.Step(Message{From: n.sm.id, Type: msgType})
 		n.elapsed = 0
 		if n.sm.state != stateLeader {
 			n.electionRand = n.election + tick(rand.Int31())%n.election

+ 4 - 4
raft/node_test.go

@@ -14,7 +14,7 @@ func TestTickMsgHup(t *testing.T) {
 	n := New(0, defaultHeartbeat, defaultElection)
 	n.sm = newStateMachine(0, []int64{0, 1, 2})
 	// simulate to patch the join log
-	n.Step(Message{Type: msgApp, Commit: 1, Entries: []Entry{Entry{}}})
+	n.Step(Message{From: 1, Type: msgApp, Commit: 1, Entries: []Entry{Entry{}}})
 
 	for i := 0; i < defaultElection*2; i++ {
 		n.Tick()
@@ -155,15 +155,15 @@ func TestDenial(t *testing.T) {
 	}{
 		{
 			Entry{Type: AddNode, Term: 1, Data: []byte(`{"NodeId":2}`)},
-			map[int64]bool{0: false, 1: false, 2: false},
+			map[int64]bool{1: false, 2: false},
 		},
 		{
 			Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":1}`)},
-			map[int64]bool{0: false, 1: true, 2: true},
+			map[int64]bool{1: true, 2: true},
 		},
 		{
 			Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":0}`)},
-			map[int64]bool{0: true, 1: false, 2: true},
+			map[int64]bool{1: false, 2: true},
 		},
 	}
 

+ 43 - 43
raft/raft_test.go

@@ -28,7 +28,7 @@ func TestLeaderElection(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		tt.send(Message{To: 0, Type: msgHup})
+		tt.send(Message{From: 0, To: 0, Type: msgHup})
 		sm := tt.network.peers[0].(*stateMachine)
 		if sm.state != tt.state {
 			t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
@@ -48,7 +48,7 @@ func TestLogReplication(t *testing.T) {
 		{
 			newNetwork(nil, nil, nil),
 			[]Message{
-				{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
+				{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
 			},
 			2,
 		},
@@ -56,16 +56,16 @@ func TestLogReplication(t *testing.T) {
 			newNetwork(nil, nil, nil),
 			[]Message{
 
-				{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
-				{To: 1, Type: msgHup},
-				{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
+				{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
+				{From: 0, To: 1, Type: msgHup},
+				{From: 0, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}},
 			},
 			4,
 		},
 	}
 
 	for i, tt := range tests {
-		tt.send(Message{To: 0, Type: msgHup})
+		tt.send(Message{From: 0, To: 0, Type: msgHup})
 
 		for _, m := range tt.msgs {
 			tt.send(m)
@@ -101,9 +101,9 @@ func TestLogReplication(t *testing.T) {
 
 func TestSingleNodeCommit(t *testing.T) {
 	tt := newNetwork(nil)
-	tt.send(Message{To: 0, Type: msgHup})
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	sm := tt.peers[0].(*stateMachine)
 	if sm.log.committed != 3 {
@@ -116,15 +116,15 @@ func TestSingleNodeCommit(t *testing.T) {
 // filtered.
 func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt := newNetwork(nil, nil, nil, nil, nil)
-	tt.send(Message{To: 0, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
 
 	// 0 cannot reach 2,3,4
 	tt.cut(0, 2)
 	tt.cut(0, 3)
 	tt.cut(0, 4)
 
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	sm := tt.peers[0].(*stateMachine)
 	if sm.log.committed != 1 {
@@ -137,7 +137,7 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	tt.ignore(msgApp)
 
 	// elect 1 as the new leader with term 2
-	tt.send(Message{To: 1, Type: msgHup})
+	tt.send(Message{From: 1, To: 1, Type: msgHup})
 
 	// no log entries from previous term should be committed
 	sm = tt.peers[1].(*stateMachine)
@@ -150,14 +150,14 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 	// send out a heartbeat
 	// after append a ChangeTerm entry from the current term, all entries
 	// should be committed
-	tt.send(Message{To: 1, Type: msgBeat})
+	tt.send(Message{From: 1, To: 1, Type: msgBeat})
 
 	if sm.log.committed != 4 {
 		t.Errorf("committed = %d, want %d", sm.log.committed, 4)
 	}
 
 	// still be able to append a entry
-	tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	if sm.log.committed != 5 {
 		t.Errorf("committed = %d, want %d", sm.log.committed, 5)
@@ -168,15 +168,15 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
 // when leader changes, no new proposal comes in.
 func TestCommitWithoutNewTermEntry(t *testing.T) {
 	tt := newNetwork(nil, nil, nil, nil, nil)
-	tt.send(Message{To: 0, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
 
 	// 0 cannot reach 2,3,4
 	tt.cut(0, 2)
 	tt.cut(0, 3)
 	tt.cut(0, 4)
 
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
-	tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
+	tt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}})
 
 	sm := tt.peers[0].(*stateMachine)
 	if sm.log.committed != 1 {
@@ -189,7 +189,7 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
 	// elect 1 as the new leader with term 2
 	// after append a ChangeTerm entry from the current term, all entries
 	// should be committed
-	tt.send(Message{To: 1, Type: msgHup})
+	tt.send(Message{From: 1, To: 1, Type: msgHup})
 
 	if sm.log.committed != 4 {
 		t.Errorf("committed = %d, want %d", sm.log.committed, 4)
@@ -204,11 +204,11 @@ func TestDuelingCandidates(t *testing.T) {
 	nt := newNetwork(a, b, c)
 	nt.cut(0, 2)
 
-	nt.send(Message{To: 0, Type: msgHup})
-	nt.send(Message{To: 2, Type: msgHup})
+	nt.send(Message{From: 0, To: 0, Type: msgHup})
+	nt.send(Message{From: 2, To: 2, Type: msgHup})
 
 	nt.recover()
-	nt.send(Message{To: 2, Type: msgHup})
+	nt.send(Message{From: 2, To: 2, Type: msgHup})
 
 	wlog := &log{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1}}, committed: 1}
 	tests := []struct {
@@ -245,15 +245,15 @@ func TestCandidateConcede(t *testing.T) {
 	tt := newNetwork(nil, nil, nil)
 	tt.isolate(0)
 
-	tt.send(Message{To: 0, Type: msgHup})
-	tt.send(Message{To: 2, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
+	tt.send(Message{From: 2, To: 2, Type: msgHup})
 
 	// heal the partition
 	tt.recover()
 
 	data := []byte("force follower")
 	// send a proposal to 2 to flush out a msgApp to 0
-	tt.send(Message{To: 2, Type: msgProp, Entries: []Entry{{Data: data}}})
+	tt.send(Message{From: 2, To: 2, Type: msgProp, Entries: []Entry{{Data: data}}})
 
 	a := tt.peers[0].(*stateMachine)
 	if g := a.state; g != stateFollower {
@@ -277,7 +277,7 @@ func TestCandidateConcede(t *testing.T) {
 
 func TestSingleNodeCandidate(t *testing.T) {
 	tt := newNetwork(nil)
-	tt.send(Message{To: 0, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
 
 	sm := tt.peers[0].(*stateMachine)
 	if sm.state != stateLeader {
@@ -288,11 +288,11 @@ func TestSingleNodeCandidate(t *testing.T) {
 func TestOldMessages(t *testing.T) {
 	tt := newNetwork(nil, nil, nil)
 	// make 0 leader @ term 3
-	tt.send(Message{To: 0, Type: msgHup})
-	tt.send(Message{To: 1, Type: msgHup})
-	tt.send(Message{To: 0, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
+	tt.send(Message{From: 1, To: 1, Type: msgHup})
+	tt.send(Message{From: 0, To: 0, Type: msgHup})
 	// pretend we're an old leader trying to make progress
-	tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
+	tt.send(Message{From: 0, To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}})
 
 	l := &log{
 		ents: []Entry{
@@ -346,8 +346,8 @@ func TestProposal(t *testing.T) {
 		data := []byte("somedata")
 
 		// promote 0 the leader
-		send(Message{To: 0, Type: msgHup})
-		send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: data}}})
+		send(Message{From: 0, To: 0, Type: msgHup})
+		send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Data: data}}})
 
 		wantLog := newLog()
 		if tt.success {
@@ -380,10 +380,10 @@ func TestProposalByProxy(t *testing.T) {
 
 	for i, tt := range tests {
 		// promote 0 the leader
-		tt.send(Message{To: 0, Type: msgHup})
+		tt.send(Message{From: 0, To: 0, Type: msgHup})
 
 		// propose via follower
-		tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}})
+		tt.send(Message{From: 1, To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}})
 
 		wantLog := &log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}
 		base := ltoa(wantLog)
@@ -613,7 +613,7 @@ func TestConf(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}})
+	sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}})
 	if sm.log.lastIndex() != 2 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}
@@ -625,7 +625,7 @@ func TestConf(t *testing.T) {
 	}
 
 	// deny the second configuration change request if there is a pending one
-	sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}})
+	sm.Step(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{Type: AddNode}}})
 	if sm.log.lastIndex() != 2 {
 		t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1)
 	}
@@ -760,7 +760,7 @@ func TestRecvMsgBeat(t *testing.T) {
 		sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.term.Set(1)
 		sm.state = tt.state
-		sm.Step(Message{Type: msgBeat})
+		sm.Step(Message{From: 0, To: 0, Type: msgBeat})
 
 		msgs := sm.Msgs()
 		if len(msgs) != tt.wMsg {
@@ -883,7 +883,7 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	sm.Step(Message{Type: msgBeat})
+	sm.Step(Message{From: 0, To: 0, Type: msgBeat})
 	msgs := sm.Msgs()
 	if len(msgs) != 1 {
 		t.Errorf("len(msgs) = %d, want 1", len(msgs))
@@ -897,7 +897,7 @@ func TestProvideSnap(t *testing.T) {
 	// node 1 needs a snapshot
 	sm.ins[1].next = sm.log.offset
 
-	sm.Step(Message{Type: msgBeat})
+	sm.Step(Message{From: 0, To: 0, Type: msgBeat})
 	msgs = sm.Msgs()
 	if len(msgs) != 1 {
 		t.Errorf("len(msgs) = %d, want 1", len(msgs))
@@ -927,11 +927,11 @@ func TestRestoreFromSnapMsg(t *testing.T) {
 
 func TestSlowNodeRestore(t *testing.T) {
 	nt := newNetwork(nil, nil, nil)
-	nt.send(Message{To: 0, Type: msgHup})
+	nt.send(Message{From: 0, To: 0, Type: msgHup})
 
 	nt.isolate(2)
 	for j := 0; j < defaultCompactThreshold+1; j++ {
-		nt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{}}})
+		nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}})
 	}
 	lead := nt.peers[0].(*stateMachine)
 	lead.nextEnts()
@@ -940,7 +940,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 
 	nt.recover()
-	nt.send(Message{To: 0, Type: msgBeat})
+	nt.send(Message{From: 0, To: 0, Type: msgBeat})
 
 	follower := nt.peers[2].(*stateMachine)
 	if !reflect.DeepEqual(follower.snapshoter.GetSnap(), lead.snapshoter.GetSnap()) {
@@ -948,7 +948,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 
 	committed := follower.log.lastIndex()
-	nt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{}}})
+	nt.send(Message{From: 0, To: 0, Type: msgProp, Entries: []Entry{{}}})
 	if follower.log.committed != committed+1 {
 		t.Errorf("follower.comitted = %d, want %d", follower.log.committed, committed+1)
 	}