Browse Source

raft: make node configurable

Xiang Li 10 years ago
parent
commit
abddef0f28
4 changed files with 76 additions and 33 deletions
  1. 34 3
      etcdserver/raft.go
  2. 2 1
      raft/example_test.go
  3. 4 25
      raft/node.go
  4. 36 4
      raft/node_test.go

+ 34 - 3
etcdserver/raft.go

@@ -40,6 +40,13 @@ const (
 	// The max throughput is around 10K. Keep a 5K entries is enough for helping
 	// follower to catch up.
 	numberOfCatchUpEntries = 5000
+
+	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
+	// Assuming the RTT is around 10ms, 1MB max size is large enough.
+	maxSizePerMsg = 1 * 1024 * 1024
+	// Never overflow the rafthttp buffer, which is 4096.
+	// TODO: a better const?
+	maxInflightMsgs = 4096 / 8
 )
 
 var (
@@ -204,7 +211,15 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
 	id = member.ID
 	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
 	s = raft.NewMemoryStorage()
-	n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+	}
+	n = raft.StartNode(c, peers)
 	raftStatus = n.Status
 	return
 }
@@ -224,7 +239,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
 	}
 	s.SetHardState(st)
 	s.Append(ents)
-	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0)
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+	}
+	n := raft.RestartNode(c)
 	raftStatus = n.Status
 	return id, n, s, w
 }
@@ -266,7 +289,15 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
 	}
 	s.SetHardState(st)
 	s.Append(ents)
-	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0)
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+	}
+	n := raft.RestartNode(c)
 	raftStatus = n.Status
 	return id, n, s, w
 }

+ 2 - 1
raft/example_test.go

@@ -24,7 +24,8 @@ func saveStateToDisk(st pb.HardState) {}
 func saveToDisk(ents []pb.Entry)      {}
 
 func Example_Node() {
-	n := StartNode(0, nil, 0, 0, nil)
+	c := &Config{}
+	n := StartNode(c, nil)
 
 	// stuff to n happens in other goroutines
 

+ 4 - 25
raft/node.go

@@ -140,20 +140,10 @@ type Peer struct {
 	Context []byte
 }
 
-// StartNode returns a new Node given a unique raft id, a list of raft peers, and
-// the election and heartbeat timeouts in units of ticks.
+// StartNode returns a new Node given configuration and a list of raft peers.
+// It ignores the given peer list in the given Config.
 // It appends a ConfChangeAddNode entry for each given peer to the initial log.
-func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
-	c := &Config{
-		ID:            id,
-		Peers:         nil,
-		ElectionTick:  election,
-		HeartbeatTick: heartbeat,
-		Storage:       storage,
-		// TODO(xiangli): make this configurable
-		MaxSizePerMsg:   noLimit,
-		MaxInflightMsgs: 256,
-	}
+func StartNode(c *Config, peers []Peer) Node {
 	r := newRaft(c)
 	// become the follower at term 1 and apply initial configuration
 	// entires of term 1
@@ -194,18 +184,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
 // The current membership of the cluster will be restored from the Storage.
 // If the caller has an existing state machine, pass in the last log index that
 // has been applied to it; otherwise use zero.
-func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node {
-	c := &Config{
-		ID:            id,
-		Peers:         nil,
-		ElectionTick:  election,
-		HeartbeatTick: heartbeat,
-		Storage:       storage,
-		Applied:       applied,
-		// TODO(xiangli): make this configurable
-		MaxSizePerMsg:   noLimit,
-		MaxInflightMsgs: 256,
-	}
+func RestartNode(c *Config) Node {
 	r := newRaft(c)
 
 	n := newNode()

+ 36 - 4
raft/node_test.go

@@ -321,7 +321,15 @@ func TestNodeStart(t *testing.T) {
 		},
 	}
 	storage := NewMemoryStorage()
-	n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage)
+	c := &Config{
+		ID:              1,
+		ElectionTick:    10,
+		HeartbeatTick:   1,
+		Storage:         storage,
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	n := StartNode(c, []Peer{{ID: 1}})
 	n.Campaign(ctx)
 	g := <-n.Ready()
 	if !reflect.DeepEqual(g, wants[0]) {
@@ -362,7 +370,15 @@ func TestNodeRestart(t *testing.T) {
 	storage := NewMemoryStorage()
 	storage.SetHardState(st)
 	storage.Append(entries)
-	n := RestartNode(1, 10, 1, storage, 0)
+	c := &Config{
+		ID:              1,
+		ElectionTick:    10,
+		HeartbeatTick:   1,
+		Storage:         storage,
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	n := RestartNode(c)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 	}
@@ -398,7 +414,15 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
 	s.SetHardState(st)
 	s.ApplySnapshot(snap)
 	s.Append(entries)
-	n := RestartNode(1, 10, 1, s, 0)
+	c := &Config{
+		ID:              1,
+		ElectionTick:    10,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	n := RestartNode(c)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 	} else {
@@ -417,7 +441,15 @@ func TestNodeAdvance(t *testing.T) {
 	defer cancel()
 
 	storage := NewMemoryStorage()
-	n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage)
+	c := &Config{
+		ID:              1,
+		ElectionTick:    10,
+		HeartbeatTick:   1,
+		Storage:         storage,
+		MaxSizePerMsg:   noLimit,
+		MaxInflightMsgs: 256,
+	}
+	n := StartNode(c, []Peer{{ID: 1}})
 	n.Campaign(ctx)
 	<-n.Ready()
 	n.Propose(ctx, []byte("foo"))