Browse Source

Merge pull request #2104 from xiang90/timeout

etcdserver: make heartbeat/election configurable
Xiang Li 11 years ago
parent
commit
1a6161d08a

+ 8 - 0
Documentation/configuration.md

@@ -22,6 +22,14 @@ To start etcd automatically using custom settings at startup in Linux, using a [
 + Number of committed transactions to trigger a snapshot to disk.
 + default: "10000"
 
+##### heartbeat-interval 
++ Time (in milliseconds) of a heartbeat interval.
++ default: "100"
+
+##### election-timeout 
++ Time (in milliseconds) for an election to timeout.
++ default: "1000"
+
 ##### -listen-peer-urls
 + List of URLs to listen on for peer traffic.
 + default: "http://localhost:2380,http://localhost:7001"

+ 8 - 0
etcdmain/config.go

@@ -77,6 +77,10 @@ type config struct {
 	maxWalFiles    uint
 	name           string
 	snapCount      uint64
+	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
+	// make ticks a cluster wide configuration.
+	TickMs     uint
+	ElectionMs uint
 
 	// clustering
 	apurls, acurls      []url.URL
@@ -137,6 +141,8 @@ func NewConfig() *config {
 	fs.UintVar(&cfg.maxWalFiles, "max-wals", defaultMaxWALs, "Maximum number of wal files to retain (0 is unlimited)")
 	fs.StringVar(&cfg.name, "name", "default", "Unique human-readable name for this node")
 	fs.Uint64Var(&cfg.snapCount, "snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
+	fs.UintVar(&cfg.TickMs, "heartbeat-interval", 100, "Time (in milliseconds) of a heartbeat interval.")
+	fs.UintVar(&cfg.ElectionMs, "election-timeout", 1000, "Time (in milliseconds) for an election to timeout.")
 
 	// clustering
 	fs.Var(flags.NewURLsValue("http://localhost:2380,http://localhost:7001"), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster")
@@ -259,3 +265,5 @@ func (cfg config) isNewCluster() bool          { return cfg.clusterState.String(
 func (cfg config) isProxy() bool               { return cfg.proxy.String() != proxyFlagOff }
 func (cfg config) isReadonlyProxy() bool       { return cfg.proxy.String() == proxyFlagReadonly }
 func (cfg config) shouldFallbackToProxy() bool { return cfg.fallback.String() == fallbackFlagProxy }
+
+func (cfg config) electionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }

+ 2 - 0
etcdmain/etcd.go

@@ -156,6 +156,8 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		NewCluster:      cfg.isNewCluster(),
 		ForceNewCluster: cfg.forceNewCluster,
 		Transport:       pt,
+		TickMs:          cfg.TickMs,
+		ElectionTicks:   cfg.electionTicks(),
 	}
 	var s *etcdserver.EtcdServer
 	s, err = etcdserver.NewServer(srvcfg)

+ 4 - 0
etcdmain/help.go

@@ -19,6 +19,10 @@ member flags:
 		path to the data directory.
 	--snapshot-count '10000'
 		number of committed transactions to trigger a snapshot to disk.
+	--heartbeat-interval '100'
+		time (in milliseconds) of a heartbeat interval.
+	--election-timeout '1000'
+		time (in milliseconds) for an election to timeout.
 	--listen-peer-urls 'http://localhost:2380,http://localhost:7001'
 		list of URLs to listen on for peer traffic.
 	--listen-client-urls 'http://localhost:2379,http://localhost:4001'

+ 4 - 2
etcdserver/config.go

@@ -44,8 +44,8 @@ type ServerConfig struct {
 	ForceNewCluster bool
 	Transport       *http.Transport
 
-	// Only for testing purpose
-	ElectionTimeoutTicks int
+	TickMs        uint
+	ElectionTicks int
 }
 
 // VerifyBootstrapConfig sanity-checks the initial config and returns an error
@@ -100,6 +100,8 @@ func (c *ServerConfig) print(initial bool) {
 		log.Println("etcdserver: force new cluster")
 	}
 	log.Printf("etcdserver: data dir = %s", c.DataDir)
+	log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
+	log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
 	log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
 	if len(c.DiscoveryURL) != 0 {
 		log.Printf("etcdserver: discovery URL= %s", c.DiscoveryURL)

+ 1 - 1
etcdserver/force_cluster.go

@@ -66,7 +66,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
 	}
 	s.SetHardState(st)
 	s.Append(ents)
-	n := raft.RestartNode(uint64(id), 10, 1, s)
+	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
 	return id, n, s, w
 }
 

+ 3 - 11
etcdserver/server.go

@@ -265,7 +265,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		storage:     NewStorage(w, ss),
 		stats:       sstats,
 		lstats:      lstats,
-		Ticker:      time.Tick(100 * time.Millisecond),
+		Ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
 		SyncTicker:  time.Tick(500 * time.Millisecond),
 		snapCount:   cfg.SnapCount,
 		reqIDGen:    idutil.NewGenerator(uint8(id), time.Now()),
@@ -870,12 +870,8 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
 	}
 	id = member.ID
 	log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
-	election := cfg.ElectionTimeoutTicks
-	if election == 0 {
-		election = 10
-	}
 	s = raft.NewMemoryStorage()
-	n = raft.StartNode(uint64(id), peers, election, 1, s)
+	n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
 	return
 }
 
@@ -888,17 +884,13 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
 	cfg.Cluster.SetID(cid)
 
 	log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
-	election := cfg.ElectionTimeoutTicks
-	if election == 0 {
-		election = 10
-	}
 	s := raft.NewMemoryStorage()
 	if snapshot != nil {
 		s.ApplySnapshot(*snapshot)
 	}
 	s.SetHardState(st)
 	s.Append(ents)
-	n := raft.RestartNode(uint64(id), election, 1, s)
+	n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
 	return id, n, s, w
 }
 

+ 2 - 2
integration/cluster_test.go

@@ -483,7 +483,7 @@ func mustNewMember(t *testing.T, name string) *member {
 	}
 	m.NewCluster = true
 	m.Transport = mustNewTransport(t)
-	m.ElectionTimeoutTicks = electionTicks
+	m.ElectionTicks = electionTicks
 	return m
 }
 
@@ -513,7 +513,7 @@ func (m *member) Clone(t *testing.T) *member {
 		panic(err)
 	}
 	mm.Transport = mustNewTransport(t)
-	mm.ElectionTimeoutTicks = m.ElectionTimeoutTicks
+	mm.ElectionTicks = m.ElectionTicks
 	return mm
 }