Selaa lähdekoodia

Merge pull request #1254 from coreos/rand_etimeout

Rand etimeout
Xiang Li 11 vuotta sitten
vanhempi
commit
5f3fe7c61f
3 muutettua tiedostoa jossa 72 lisäystä ja 27 poistoa
  1. 3 3
      raft/node_test.go
  2. 14 2
      raft/raft.go
  3. 55 22
      raft/raft_test.go

+ 3 - 3
raft/node_test.go

@@ -175,7 +175,7 @@ func TestNode(t *testing.T) {
 		},
 	}
 
-	n := StartNode(1, []int64{1}, 0, 0)
+	n := StartNode(1, []int64{1}, 10, 1)
 	n.Campaign(ctx)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
@@ -207,7 +207,7 @@ func TestNodeRestart(t *testing.T) {
 		CommittedEntries: entries[1 : st.Commit+1],
 	}
 
-	n := RestartNode(1, []int64{1}, 0, 0, nil, st, entries)
+	n := RestartNode(1, []int64{1}, 10, 1, nil, st, entries)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 	}
@@ -224,7 +224,7 @@ func TestNodeRestart(t *testing.T) {
 func TestNodeCompact(t *testing.T) {
 	ctx := context.Background()
 	n := newNode()
-	r := newRaft(1, []int64{1}, 0, 0)
+	r := newRaft(1, []int64{1}, 10, 1)
 	go n.run(r)
 
 	n.Campaign(ctx)

+ 14 - 2
raft/raft.go

@@ -3,6 +3,7 @@ package raft
 import (
 	"errors"
 	"fmt"
+	"math/rand"
 	"sort"
 
 	pb "github.com/coreos/etcd/raft/raftpb"
@@ -132,6 +133,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
 	if id == None {
 		panic("cannot use none id")
 	}
+	rand.Seed(id)
 	r := &raft{
 		id:               id,
 		lead:             None,
@@ -286,8 +288,7 @@ func (r *raft) tickElection() {
 		return
 	}
 	r.elapsed++
-	// TODO (xiangli): elctionTimeout should be randomized.
-	if r.elapsed > r.electionTimeout {
+	if r.isElectionTimeout() {
 		r.elapsed = 0
 		r.Step(pb.Message{From: r.id, Type: msgHup})
 	}
@@ -585,3 +586,14 @@ func (r *raft) loadState(state pb.HardState) {
 	r.Vote = state.Vote
 	r.Commit = state.Commit
 }
+
+// isElectionTimeout returns true if r.elapsed is greater than the
+// randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
+// Otherwise, it returns false.
+func (r *raft) isElectionTimeout() bool {
+	d := r.elapsed - r.electionTimeout
+	if d < 0 {
+		return false
+	}
+	return d > rand.Int()%r.electionTimeout
+}

+ 55 - 22
raft/raft_test.go

@@ -3,6 +3,7 @@ package raft
 import (
 	"bytes"
 	"fmt"
+	"math"
 	"math/rand"
 	"reflect"
 	"sort"
@@ -201,9 +202,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
 }
 
 func TestDuelingCandidates(t *testing.T) {
-	a := newRaft(-1, nil, 0, 0) // k, id are set later
-	b := newRaft(-1, nil, 0, 0)
-	c := newRaft(-1, nil, 0, 0)
+	a := newRaft(-1, nil, 10, 1) // k, id are set later
+	b := newRaft(-1, nil, 10, 1)
+	c := newRaft(-1, nil, 10, 1)
 
 	nt := newNetwork(a, b, c)
 	nt.cut(1, 3)
@@ -492,6 +493,38 @@ func TestCommit(t *testing.T) {
 	}
 }
 
+func TestIsElectionTimeout(t *testing.T) {
+	tests := []struct {
+		elapse       int
+		wprobability float64
+		round        bool
+	}{
+		{5, 0, false},
+		{13, 0.3, true},
+		{15, 0.5, true},
+		{18, 0.8, true},
+		{20, 1, false},
+	}
+
+	for i, tt := range tests {
+		sm := newRaft(1, []int64{1}, 10, 1)
+		sm.elapsed = tt.elapse
+		c := 0
+		for j := 0; j < 10000; j++ {
+			if sm.isElectionTimeout() {
+				c++
+			}
+		}
+		got := float64(c) / 10000.0
+		if tt.round {
+			got = math.Floor(got*10+0.5) / 10.0
+		}
+		if got != tt.wprobability {
+			t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability)
+		}
+	}
+}
+
 // ensure that the Step function ignores the message from old term and does not pass it to the
 // acutal stepX function.
 func TestStepIgnoreOldTermMsg(t *testing.T) {
@@ -499,7 +532,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
 	fakeStep := func(r *raft, m pb.Message) {
 		called = true
 	}
-	sm := newRaft(1, []int64{1}, 0, 0)
+	sm := newRaft(1, []int64{1}, 10, 1)
 	sm.step = fakeStep
 	sm.Term = 2
 	sm.Step(pb.Message{Type: msgApp, Term: sm.Term - 1})
@@ -597,7 +630,7 @@ func TestRecvMsgVote(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []int64{1}, 0, 0)
+		sm := newRaft(1, []int64{1}, 10, 1)
 		sm.state = tt.state
 		switch tt.state {
 		case StateFollower:
@@ -654,7 +687,7 @@ func TestStateTransition(t *testing.T) {
 				}
 			}()
 
-			sm := newRaft(1, []int64{1}, 0, 0)
+			sm := newRaft(1, []int64{1}, 10, 1)
 			sm.state = tt.from
 
 			switch tt.to {
@@ -693,7 +726,7 @@ func TestAllServerStepdown(t *testing.T) {
 	tterm := int64(3)
 
 	for i, tt := range tests {
-		sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
+		sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
 		switch tt.state {
 		case StateFollower:
 			sm.becomeFollower(1, None)
@@ -743,7 +776,7 @@ func TestLeaderAppResp(t *testing.T) {
 	for i, tt := range tests {
 		// sm term is 1 after it becomes the leader.
 		// thus the last log term must be 1 to be committed.
-		sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
+		sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
 		sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.becomeCandidate()
 		sm.becomeLeader()
@@ -775,7 +808,7 @@ func TestBcastBeat(t *testing.T) {
 		Term:  1,
 		Nodes: []int64{1, 2, 3},
 	}
-	sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
+	sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
 	sm.Term = 1
 	sm.restore(s)
 
@@ -825,7 +858,7 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []int64{1, 2, 3}, 0, 0)
+		sm := newRaft(1, []int64{1, 2, 3}, 10, 1)
 		sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.Term = 1
 		sm.state = tt.state
@@ -858,7 +891,7 @@ func TestRestore(t *testing.T) {
 		Nodes: []int64{1, 2, 3},
 	}
 
-	sm := newRaft(1, []int64{1, 2}, 0, 0)
+	sm := newRaft(1, []int64{1, 2}, 10, 1)
 	if ok := sm.restore(s); !ok {
 		t.Fatal("restore fail, want succeed")
 	}
@@ -891,7 +924,7 @@ func TestProvideSnap(t *testing.T) {
 		Term:  defaultCompactThreshold + 1,
 		Nodes: []int64{1, 2},
 	}
-	sm := newRaft(1, []int64{1}, 0, 0)
+	sm := newRaft(1, []int64{1}, 10, 1)
 	// restore the statemachin from a snapshot
 	// so it has a compacted log and a snapshot
 	sm.restore(s)
@@ -922,7 +955,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
 	}
 	m := pb.Message{Type: msgSnap, From: 1, Term: 2, Snapshot: s}
 
-	sm := newRaft(2, []int64{1, 2}, 0, 0)
+	sm := newRaft(2, []int64{1, 2}, 10, 1)
 	sm.Step(m)
 
 	if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
@@ -961,7 +994,7 @@ func TestSlowNodeRestore(t *testing.T) {
 // it appends the entry to log and sets pendingConf to be true.
 func TestStepConfig(t *testing.T) {
 	// a raft that cannot make progress
-	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r := newRaft(1, []int64{1, 2}, 10, 1)
 	r.becomeCandidate()
 	r.becomeLeader()
 	index := r.raftLog.lastIndex()
@@ -979,7 +1012,7 @@ func TestStepConfig(t *testing.T) {
 // the proposal and keep its original state.
 func TestStepIgnoreConfig(t *testing.T) {
 	// a raft that cannot make progress
-	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r := newRaft(1, []int64{1, 2}, 10, 1)
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
@@ -1005,7 +1038,7 @@ func TestRecoverPendingConfig(t *testing.T) {
 		{pb.EntryConfChange, true},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, []int64{1, 2}, 0, 0)
+		r := newRaft(1, []int64{1, 2}, 10, 1)
 		r.appendEntry(pb.Entry{Type: tt.entType})
 		r.becomeCandidate()
 		r.becomeLeader()
@@ -1024,7 +1057,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 				t.Errorf("expect panic, but nothing happens")
 			}
 		}()
-		r := newRaft(1, []int64{1, 2}, 0, 0)
+		r := newRaft(1, []int64{1, 2}, 10, 1)
 		r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
 		r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
 		r.becomeCandidate()
@@ -1034,7 +1067,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 
 // TestAddNode tests that addNode could update pendingConf and nodes correctly.
 func TestAddNode(t *testing.T) {
-	r := newRaft(1, []int64{1}, 0, 0)
+	r := newRaft(1, []int64{1}, 10, 1)
 	r.pendingConf = true
 	r.addNode(2)
 	if r.pendingConf != false {
@@ -1051,7 +1084,7 @@ func TestAddNode(t *testing.T) {
 // TestRemoveNode tests that removeNode could update pendingConf, nodes and
 // and removed list correctly.
 func TestRemoveNode(t *testing.T) {
-	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r := newRaft(1, []int64{1, 2}, 10, 1)
 	r.pendingConf = true
 	r.removeNode(2)
 	if r.pendingConf != false {
@@ -1074,7 +1107,7 @@ func TestRecvMsgDenied(t *testing.T) {
 	fakeStep := func(r *raft, m pb.Message) {
 		called = true
 	}
-	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r := newRaft(1, []int64{1, 2}, 10, 1)
 	r.step = fakeStep
 	r.Step(pb.Message{From: 2, Type: msgDenied})
 	if called != false {
@@ -1102,7 +1135,7 @@ func TestRecvMsgFromRemovedNode(t *testing.T) {
 		fakeStep := func(r *raft, m pb.Message) {
 			called = true
 		}
-		r := newRaft(1, []int64{1}, 0, 0)
+		r := newRaft(1, []int64{1}, 10, 1)
 		r.step = fakeStep
 		r.removeNode(tt.from)
 		r.Step(pb.Message{From: tt.from, Type: msgVote})
@@ -1176,7 +1209,7 @@ func newNetwork(peers ...Interface) *network {
 		id := peerAddrs[i]
 		switch v := p.(type) {
 		case nil:
-			sm := newRaft(id, peerAddrs, 0, 0)
+			sm := newRaft(id, peerAddrs, 10, 1)
 			npeers[id] = sm
 		case *raft:
 			v.id = id