Browse Source

raft: make raft configurable

Xiang Li 10 years ago
parent
commit
d9b5b56c82
9 changed files with 205 additions and 93 deletions
  1. 10 1
      raft/multinode.go
  2. 25 5
      raft/node.go
  3. 1 1
      raft/node_bench_test.go
  4. 5 5
      raft/node_test.go
  5. 83 14
      raft/raft.go
  6. 3 3
      raft/raft_flow_control_test.go
  7. 22 22
      raft/raft_paper_test.go
  8. 5 5
      raft/raft_snap_test.go
  9. 51 37
      raft/raft_test.go

+ 10 - 1
raft/multinode.go

@@ -182,7 +182,16 @@ func (mn *multiNode) run() {
 		select {
 		case gc := <-mn.groupc:
 			// TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
-			r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0)
+			// TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable
+			c := &Config{
+				ID:              mn.id,
+				ElectionTick:    mn.election,
+				HeartbeatTick:   mn.heartbeat,
+				Storage:         gc.storage,
+				MaxSizePerMsg:   noLimit,
+				MaxInflightMsgs: 256,
+			}
+			r := newRaft(c)
 			group = &groupState{
 				id:   gc.id,
 				raft: r,

+ 25 - 5
raft/node.go

@@ -144,9 +144,17 @@ type Peer struct {
 // the election and heartbeat timeouts in units of ticks.
 // It appends a ConfChangeAddNode entry for each given peer to the initial log.
 func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
-	n := newNode()
-	r := newRaft(id, nil, election, heartbeat, storage, 0)
-
+	c := &Config{
+		ID:            id,
+		Peers:         nil,
+		ElectionTick:  election,
+		HeartbeatTick: heartbeat,
+		Storage:       storage,
+		// TODO(xiangli): make this configurable
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	r := newRaft(c)
 	// become the follower at term 1 and apply initial configuration
 	// entires of term 1
 	r.becomeFollower(1, None)
@@ -177,6 +185,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
 		r.addNode(peer.ID)
 	}
 
+	n := newNode()
 	go n.run(r)
 	return &n
 }
@@ -186,9 +195,20 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
 // If the caller has an existing state machine, pass in the last log index that
 // has been applied to it; otherwise use zero.
 func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node {
-	n := newNode()
-	r := newRaft(id, nil, election, heartbeat, storage, applied)
+	c := &Config{
+		ID:            id,
+		Peers:         nil,
+		ElectionTick:  election,
+		HeartbeatTick: heartbeat,
+		Storage:       storage,
+		Applied:       applied,
+		// TODO(xiangli): make this configurable
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	r := newRaft(c)
 
+	n := newNode()
 	go n.run(r)
 	return &n
 }

+ 1 - 1
raft/node_bench_test.go

@@ -26,7 +26,7 @@ func BenchmarkOneNode(b *testing.B) {
 
 	n := newNode()
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
 	go n.run(r)
 
 	defer n.Stop()

+ 5 - 5
raft/node_test.go

@@ -114,7 +114,7 @@ func TestNodePropose(t *testing.T) {
 
 	n := newNode()
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
 	go n.run(r)
 	n.Campaign(context.TODO())
 	for {
@@ -152,7 +152,7 @@ func TestNodeProposeConfig(t *testing.T) {
 
 	n := newNode()
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
 	go n.run(r)
 	n.Campaign(context.TODO())
 	for {
@@ -190,7 +190,7 @@ func TestNodeProposeConfig(t *testing.T) {
 // who is the current leader.
 func TestBlockProposal(t *testing.T) {
 	n := newNode()
-	r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	go n.run(r)
 	defer n.Stop()
 
@@ -223,7 +223,7 @@ func TestBlockProposal(t *testing.T) {
 func TestNodeTick(t *testing.T) {
 	n := newNode()
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
 	go n.run(r)
 	elapsed := r.elapsed
 	n.Tick()
@@ -238,7 +238,7 @@ func TestNodeTick(t *testing.T) {
 func TestNodeStop(t *testing.T) {
 	n := newNode()
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, s)
 	donec := make(chan struct{})
 
 	go func() {

+ 83 - 14
raft/raft.go

@@ -51,6 +51,75 @@ func (st StateType) String() string {
 	return stmap[uint64(st)]
 }
 
+// Config contains the parameters to start a raft.
+type Config struct {
+	// ID is the identity of the local raft. ID cannot be 0.
+	ID uint64
+	// Peers contains the IDs of all nodes (including self) in
+	// the raft cluster. It should only be set when starting a new
+	// raft cluster.
+	// Restarting raft from previous configuration will panic if
+	// Peers is set.
+	Peers []uint64
+
+	// ElectionTick is the election timeout. If a follower does not
+	// receive any message from the leader of current term during
+	// ElectionTick, it will become candidate and start an election.
+	// ElectionTick must be greater than HeartbeatTick. We suggest
+	// to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary
+	// leader switching.
+	ElectionTick int
+	// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
+	// message to maintain the leadership every heartbeat interval.
+	HeartbeatTick int
+
+	// Storage is the storage for raft. raft generates entires and
+	// states to be stored in storage. raft reads the persisted entires
+	// and states out of Storage when it needs. raft reads out the previous
+	// state and configuration out of storage when restarting.
+	Storage Storage
+	// Applied is the last applied index. It should only be set when restarting
+	// raft. raft will not return entries to the application smaller or equal to Applied.
+	// If Applied is unset when restarting, raft might return previous applied entries.
+	// This is a very application dependent configuration.
+	Applied uint64
+
+	// MaxSizePerMsg limits the max size of each append message. Smaller value lowers
+	// the raft recovery cost(initial probing and message lost during normal operation).
+	// On the other side, it might affect the throughput during normal replication.
+	// Note: math.MaxUint64 for unlimited, 0 for at most one entry per message.
+	MaxSizePerMsg uint64
+	// MaxInflightMsgs limits the max number of in-flight append messages during optimistic
+	// replication phase. The application transportation layer usually has its own sending
+	// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
+	// TODO (xiangli): feedback to application to limit the proposal rate?
+	MaxInflightMsgs int
+}
+
+func (c *Config) validate() error {
+	if c.ID == None {
+		return errors.New("cannot use none as id")
+	}
+
+	if c.HeartbeatTick <= 0 {
+		return errors.New("heartbeat tick must be greater than 0")
+	}
+
+	if c.ElectionTick <= c.HeartbeatTick {
+		return errors.New("election tick must be greater than heartbeat tick")
+	}
+
+	if c.Storage == nil {
+		return errors.New("storage cannot be nil")
+	}
+
+	if c.MaxInflightMsgs <= 0 {
+		return errors.New("max inflight messages must be greater than 0")
+	}
+
+	return nil
+}
+
 type raft struct {
 	pb.HardState
 
@@ -83,16 +152,16 @@ type raft struct {
 	step             stepFunc
 }
 
-func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage,
-	applied uint64) *raft {
-	if id == None {
-		panic("cannot use none id")
+func newRaft(c *Config) *raft {
+	if err := c.validate(); err != nil {
+		panic(err.Error())
 	}
-	raftlog := newLog(storage)
-	hs, cs, err := storage.InitialState()
+	raftlog := newLog(c.Storage)
+	hs, cs, err := c.Storage.InitialState()
 	if err != nil {
 		panic(err) // TODO(bdarnell)
 	}
+	peers := c.Peers
 	if len(cs.Nodes) > 0 {
 		if len(peers) > 0 {
 			// TODO(bdarnell): the peers argument is always nil except in
@@ -103,27 +172,27 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
 		peers = cs.Nodes
 	}
 	r := &raft{
-		id:      id,
+		id:      c.ID,
 		lead:    None,
 		raftLog: raftlog,
 		// 4MB for now and hard code it
 		// TODO(xiang): add a config arguement into newRaft after we add
 		// the max inflight message field.
-		maxMsgSize:       4 * 1024 * 1024,
-		maxInflight:      256,
+		maxMsgSize:       c.MaxSizePerMsg,
+		maxInflight:      c.MaxInflightMsgs,
 		prs:              make(map[uint64]*Progress),
-		electionTimeout:  election,
-		heartbeatTimeout: heartbeat,
+		electionTimeout:  c.ElectionTick,
+		heartbeatTimeout: c.HeartbeatTick,
 	}
-	r.rand = rand.New(rand.NewSource(int64(id)))
+	r.rand = rand.New(rand.NewSource(int64(c.ID)))
 	for _, p := range peers {
 		r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
 	}
 	if !isHardStateEqual(hs, emptyState) {
 		r.loadState(hs)
 	}
-	if applied > 0 {
-		raftlog.appliedTo(applied)
+	if c.Applied > 0 {
+		raftlog.appliedTo(c.Applied)
 	}
 	r.becomeFollower(r.Term, None)
 

+ 3 - 3
raft/raft_flow_control_test.go

@@ -24,7 +24,7 @@ import (
 // 1. msgApp can fill the sending window until full
 // 2. when the window is full, no more msgApp can be sent.
 func TestMsgAppFlowControlFull(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 
@@ -60,7 +60,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
 // 1. vaild msgAppResp.index moves the windows to pass all smaller or equal index.
 // 2. out-of-dated msgAppResp has no effect on the silding window.
 func TestMsgAppFlowControlMoveForward(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 
@@ -105,7 +105,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
 // TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
 // frees one slot if the window is full.
 func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 

+ 22 - 22
raft/raft_paper_test.go

@@ -52,7 +52,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) {
 // it immediately reverts to follower state.
 // Reference: section 5.1
 func testUpdateTermFromMessage(t *testing.T, state StateType) {
-	r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	switch state {
 	case StateFollower:
 		r.becomeFollower(1, 2)
@@ -82,7 +82,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
 	fakeStep := func(r *raft, m pb.Message) {
 		called = true
 	}
-	r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	r.step = fakeStep
 	r.loadState(pb.HardState{Term: 2})
 
@@ -96,7 +96,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
 // TestStartAsFollower tests that when servers start up, they begin as followers.
 // Reference: section 5.2
 func TestStartAsFollower(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 	if r.state != StateFollower {
 		t.Errorf("state = %s, want %s", r.state, StateFollower)
 	}
@@ -109,7 +109,7 @@ func TestStartAsFollower(t *testing.T) {
 func TestLeaderBcastBeat(t *testing.T) {
 	// heartbeat interval
 	hi := 1
-	r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	for i := 0; i < 10; i++ {
@@ -151,7 +151,7 @@ func TestCandidateStartNewElection(t *testing.T) {
 func testNonleaderStartElection(t *testing.T, state StateType) {
 	// election timeout
 	et := 10
-	r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
 	switch state {
 	case StateFollower:
 		r.becomeFollower(1, 2)
@@ -215,7 +215,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {
 		{5, map[uint64]bool{}, StateCandidate},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage())
 
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
 		for id, vote := range tt.votes {
@@ -248,7 +248,7 @@ func TestFollowerVote(t *testing.T) {
 		{2, 1, true},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
 
 		r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
@@ -274,7 +274,7 @@ func TestCandidateFallback(t *testing.T) {
 		{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
 		if r.state != StateCandidate {
 			t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
@@ -307,7 +307,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) {
 // Reference: section 5.2
 func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
 	et := 10
-	r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
 	timeouts := make(map[int]bool)
 	for round := 0; round < 50*et; round++ {
 		switch state {
@@ -353,7 +353,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
 	rs := make([]*raft, size)
 	ids := idsBySize(size)
 	for k := range rs {
-		rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage(), 0)
+		rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage())
 	}
 	conflicts := 0
 	for round := 0; round < 1000; round++ {
@@ -396,7 +396,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
 // Reference: section 5.3
 func TestLeaderStartReplication(t *testing.T) {
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
 	r.becomeCandidate()
 	r.becomeLeader()
 	commitNoopEntry(r, s)
@@ -435,7 +435,7 @@ func TestLeaderStartReplication(t *testing.T) {
 // Reference: section 5.3
 func TestLeaderCommitEntry(t *testing.T) {
 	s := NewMemoryStorage()
-	r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
 	r.becomeCandidate()
 	r.becomeLeader()
 	commitNoopEntry(r, s)
@@ -489,7 +489,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
 	}
 	for i, tt := range tests {
 		s := NewMemoryStorage()
-		r := newRaft(1, idsBySize(tt.size), 10, 1, s, 0)
+		r := newTestRaft(1, idsBySize(tt.size), 10, 1, s)
 		r.becomeCandidate()
 		r.becomeLeader()
 		commitNoopEntry(r, s)
@@ -523,7 +523,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append(tt)
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
 		r.loadState(pb.HardState{Term: 2})
 		r.becomeCandidate()
 		r.becomeLeader()
@@ -578,7 +578,7 @@ func TestFollowerCommitEntry(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		r.becomeFollower(1, 2)
 
 		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
@@ -621,7 +621,7 @@ func TestFollowerCheckMsgApp(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append(ents)
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
 		r.loadState(pb.HardState{Commit: 1})
 		r.becomeFollower(2, 2)
 
@@ -677,7 +677,7 @@ func TestFollowerAppendEntries(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
 		r.becomeFollower(2, 2)
 
 		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
@@ -746,11 +746,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) {
 	for i, tt := range tests {
 		leadStorage := NewMemoryStorage()
 		leadStorage.Append(ents)
-		lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage, 0)
+		lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
 		lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
 		followerStorage := NewMemoryStorage()
 		followerStorage.Append(tt)
-		follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage, 0)
+		follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
 		follower.loadState(pb.HardState{Term: term - 1})
 		// It is necessary to have a three-node cluster.
 		// The second may have more up-to-date log than the first one, so the
@@ -779,7 +779,7 @@ func TestVoteRequest(t *testing.T) {
 		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
 	}
 	for j, tt := range tests {
-		r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		r.Step(pb.Message{
 			From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
 		})
@@ -842,7 +842,7 @@ func TestVoter(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append(tt.ents)
-		r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+		r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 
 		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
 
@@ -878,7 +878,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append(ents)
-		r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+		r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 		r.loadState(pb.HardState{Term: 2})
 		// become leader at term 3
 		r.becomeCandidate()

+ 5 - 5
raft/raft_snap_test.go

@@ -32,7 +32,7 @@ var (
 
 func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
 	sm.restore(testingSnap)
 
 	sm.becomeCandidate()
@@ -50,7 +50,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
 
 func TestPendingSnapshotPauseReplication(t *testing.T) {
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	sm.restore(testingSnap)
 
 	sm.becomeCandidate()
@@ -67,7 +67,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
 
 func TestSnapshotFailure(t *testing.T) {
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	sm.restore(testingSnap)
 
 	sm.becomeCandidate()
@@ -90,7 +90,7 @@ func TestSnapshotFailure(t *testing.T) {
 
 func TestSnapshotSucceed(t *testing.T) {
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	sm.restore(testingSnap)
 
 	sm.becomeCandidate()
@@ -113,7 +113,7 @@ func TestSnapshotSucceed(t *testing.T) {
 
 func TestSnapshotAbort(t *testing.T) {
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	sm.restore(testingSnap)
 
 	sm.becomeCandidate()

+ 51 - 37
raft/raft_test.go

@@ -262,7 +262,7 @@ func TestProgressResume(t *testing.T) {
 
 // TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
 func TestProgressResumeByHeartbeat(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.prs[2].Paused = true
@@ -274,7 +274,7 @@ func TestProgressResumeByHeartbeat(t *testing.T) {
 }
 
 func TestProgressPaused(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
@@ -466,9 +466,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
 }
 
 func TestDuelingCandidates(t *testing.T) {
-	a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
-	b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
-	c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+	a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
+	c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 
 	nt := newNetwork(a, b, c)
 	nt.cut(1, 3)
@@ -736,7 +736,7 @@ func TestCommit(t *testing.T) {
 		storage.Append(tt.logs)
 		storage.hardState = pb.HardState{Term: tt.smTerm}
 
-		sm := newRaft(1, []uint64{1}, 5, 1, storage, 0)
+		sm := newTestRaft(1, []uint64{1}, 5, 1, storage)
 		for j := 0; j < len(tt.matches); j++ {
 			sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
 		}
@@ -761,7 +761,7 @@ func TestIsElectionTimeout(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 		sm.elapsed = tt.elapse
 		c := 0
 		for j := 0; j < 10000; j++ {
@@ -786,7 +786,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
 	fakeStep := func(r *raft, m pb.Message) {
 		called = true
 	}
-	sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+	sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	sm.step = fakeStep
 	sm.Term = 2
 	sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
@@ -828,7 +828,7 @@ func TestHandleMsgApp(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
-		sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
+		sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
 		sm.becomeFollower(2, None)
 
 		sm.handleAppendEntries(tt.m)
@@ -862,7 +862,7 @@ func TestHandleHeartbeat(t *testing.T) {
 	for i, tt := range tests {
 		storage := NewMemoryStorage()
 		storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
-		sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
+		sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage)
 		sm.becomeFollower(2, 2)
 		sm.raftLog.commitTo(commit)
 		sm.handleHeartbeat(tt.m)
@@ -883,7 +883,7 @@ func TestHandleHeartbeat(t *testing.T) {
 func TestHandleHeartbeatResp(t *testing.T) {
 	storage := NewMemoryStorage()
 	storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
-	sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage)
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.raftLog.commitTo(sm.raftLog.lastIndex())
@@ -942,7 +942,7 @@ func TestHandleHeartbeatResp(t *testing.T) {
 // TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
 // MsgAppResp.
 func TestMsgAppRespWaitReset(t *testing.T) {
-	sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0)
+	sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
@@ -1036,7 +1036,7 @@ func TestRecvMsgVote(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 		sm.state = tt.state
 		switch tt.state {
 		case StateFollower:
@@ -1096,7 +1096,7 @@ func TestStateTransition(t *testing.T) {
 				}
 			}()
 
-			sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+			sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 			sm.state = tt.from
 
 			switch tt.to {
@@ -1135,7 +1135,7 @@ func TestAllServerStepdown(t *testing.T) {
 	tterm := uint64(3)
 
 	for i, tt := range tests {
-		sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		switch tt.state {
 		case StateFollower:
 			sm.becomeFollower(1, None)
@@ -1194,7 +1194,7 @@ func TestLeaderAppResp(t *testing.T) {
 	for i, tt := range tests {
 		// sm term is 1 after it becomes the leader.
 		// thus the last log term must be 1 to be committed.
-		sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		sm.raftLog = &raftLog{
 			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
 			unstable: unstable{offset: 3},
@@ -1242,7 +1242,7 @@ func TestBcastBeat(t *testing.T) {
 	}
 	storage := NewMemoryStorage()
 	storage.ApplySnapshot(s)
-	sm := newRaft(1, nil, 10, 1, storage, 0)
+	sm := newTestRaft(1, nil, 10, 1, storage)
 	sm.Term = 1
 
 	sm.becomeCandidate()
@@ -1301,7 +1301,7 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
 		sm.Term = 1
 		sm.state = tt.state
@@ -1344,7 +1344,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+		sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		sm.raftLog.append(previousEnts...)
 		sm.becomeCandidate()
 		sm.becomeLeader()
@@ -1360,7 +1360,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
 }
 
 func TestSendAppendForProgressProbe(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.readMessages()
@@ -1406,7 +1406,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 }
 
 func TestSendAppendForProgressReplicate(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.readMessages()
@@ -1423,7 +1423,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
 }
 
 func TestSendAppendForProgressSnapshot(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.readMessages()
@@ -1443,7 +1443,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
 	s := NewMemoryStorage()
 	s.Append(previousEnts)
-	r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, s)
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.readMessages()
@@ -1472,7 +1472,7 @@ func TestRestore(t *testing.T) {
 	}
 
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	if ok := sm.restore(s); !ok {
 		t.Fatal("restore fail, want succeed")
 	}
@@ -1497,7 +1497,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
 	commit := uint64(1)
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
 	sm.raftLog.append(previousEnts...)
 	sm.raftLog.commitTo(commit)
 
@@ -1538,7 +1538,7 @@ func TestProvideSnap(t *testing.T) {
 		},
 	}
 	storage := NewMemoryStorage()
-	sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
 	sm.restore(s)
 
 	sm.becomeCandidate()
@@ -1569,7 +1569,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
 	}
 	m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
 
-	sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	sm.Step(m)
 
 	// TODO(bdarnell): what should this test?
@@ -1604,7 +1604,7 @@ func TestSlowNodeRestore(t *testing.T) {
 // it appends the entry to log and sets pendingConf to be true.
 func TestStepConfig(t *testing.T) {
 	// a raft that cannot make progress
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	index := r.raftLog.lastIndex()
@@ -1622,7 +1622,7 @@ func TestStepConfig(t *testing.T) {
 // the proposal to noop and keep its original state.
 func TestStepIgnoreConfig(t *testing.T) {
 	// a raft that cannot make progress
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
@@ -1649,7 +1649,7 @@ func TestRecoverPendingConfig(t *testing.T) {
 		{pb.EntryConfChange, true},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		r.appendEntry(pb.Entry{Type: tt.entType})
 		r.becomeCandidate()
 		r.becomeLeader()
@@ -1668,7 +1668,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 				t.Errorf("expect panic, but nothing happens")
 			}
 		}()
-		r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
 		r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
 		r.becomeCandidate()
@@ -1678,7 +1678,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 
 // TestAddNode tests that addNode could update pendingConf and nodes correctly.
 func TestAddNode(t *testing.T) {
-	r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	r.pendingConf = true
 	r.addNode(2)
 	if r.pendingConf != false {
@@ -1694,7 +1694,7 @@ func TestAddNode(t *testing.T) {
 // TestRemoveNode tests that removeNode could update pendingConf, nodes and
 // and removed list correctly.
 func TestRemoveNode(t *testing.T) {
-	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.pendingConf = true
 	r.removeNode(2)
 	if r.pendingConf != false {
@@ -1718,7 +1718,7 @@ func TestPromotable(t *testing.T) {
 		{[]uint64{2, 3}, false},
 	}
 	for i, tt := range tests {
-		r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage())
 		if g := r.promotable(); g != tt.wp {
 			t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
 		}
@@ -1740,7 +1740,7 @@ func TestRaftNodes(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage(), 0)
+		r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
 		if !reflect.DeepEqual(r.nodes(), tt.wids) {
 			t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
 		}
@@ -1752,7 +1752,7 @@ func ents(terms ...uint64) *raft {
 	for i, term := range terms {
 		storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
 	}
-	sm := newRaft(1, []uint64{}, 5, 1, storage, 0)
+	sm := newTestRaft(1, []uint64{}, 5, 1, storage)
 	sm.reset(0)
 	return sm
 }
@@ -1780,7 +1780,7 @@ func newNetwork(peers ...Interface) *network {
 		switch v := p.(type) {
 		case nil:
 			nstorage[id] = NewMemoryStorage()
-			sm := newRaft(id, peerAddrs, 10, 1, nstorage[id], 0)
+			sm := newTestRaft(id, peerAddrs, 10, 1, nstorage[id])
 			npeers[id] = sm
 		case *raft:
 			v.id = id
@@ -1880,3 +1880,17 @@ func idsBySize(size int) []uint64 {
 	}
 	return ids
 }
+
+func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
+	c := &Config{
+		ID:              id,
+		Peers:           peers,
+		ElectionTick:    election,
+		HeartbeatTick:   heartbeat,
+		Storage:         storage,
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+
+	return newRaft(c)
+}