瀏覽代碼

etcdserver: adjust commit timeout based on config

It uses heartbeat interval and election timeout to estimate the
commit timeout for internal requests.

This PR helps etcd survive under high roundtrip-time environment,
e.g., globally-deployed cluster.
Yicheng Qin 10 年之前
父節點
當前提交
5a91937367
共有 4 個文件被更改,包括 23 次插入8 次删除
  1. 9 0
      etcdserver/config.go
  2. 1 1
      etcdserver/raft.go
  3. 3 7
      etcdserver/server.go
  4. 10 0
      etcdserver/server_test.go

+ 9 - 0
etcdserver/config.go

@@ -20,6 +20,7 @@ import (
 	"path"
 	"reflect"
 	"sort"
+	"time"
 
 	"github.com/coreos/etcd/pkg/types"
 )
@@ -110,6 +111,14 @@ func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap"
 
 func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
 
+// CommitTimeout returns commit timeout under normal case.
+func (c *ServerConfig) CommitTimeout() time.Duration {
+	// We assume that heartbeat >= TTL.
+	// 5s for queue waiting, computation and disk IO delay
+	// + 2 * heartbeat(TTL) for expected time between proposal by follower and commit at the follower
+	return 5*time.Second + 2*time.Duration(c.TickMs)*time.Millisecond
+}
+
 func (c *ServerConfig) PrintWithInitial() { c.print(true) }
 
 func (c *ServerConfig) Print() { c.print(false) }

+ 1 - 1
etcdserver/raft.go

@@ -175,7 +175,7 @@ func (r *raftNode) start(s *EtcdServer) {
 				}
 				r.Advance()
 			case <-syncC:
-				r.s.sync(defaultSyncTimeout)
+				r.s.sync(r.s.cfg.CommitTimeout())
 			case <-r.stopped:
 				return
 			}

+ 3 - 7
etcdserver/server.go

@@ -54,17 +54,13 @@ const (
 	// owner can make/remove files inside the directory
 	privateDirMode = 0700
 
-	defaultSyncTimeout = time.Second
-	DefaultSnapCount   = 10000
-	// TODO: calculate based on heartbeat interval
-	defaultPublishTimeout = 5 * time.Second
+	DefaultSnapCount = 10000
 
 	StoreClusterPrefix = "/0"
 	StoreKeysPrefix    = "/1"
 
 	purgeFileInterval      = 30 * time.Second
 	monitorVersionInterval = 5 * time.Second
-	versionUpdateTimeout   = 1 * time.Second
 )
 
 var (
@@ -347,7 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 // It also starts a goroutine to publish its server information.
 func (s *EtcdServer) Start() {
 	s.start()
-	go s.publish(defaultPublishTimeout)
+	go s.publish(s.cfg.CommitTimeout())
 	go s.purgeFile()
 	go monitorFileDescriptor(s.done)
 	go s.monitorVersions()
@@ -1005,7 +1001,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
 		Path:   path.Join(StoreClusterPrefix, "version"),
 		Val:    ver,
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), s.cfg.CommitTimeout())
 	_, err := s.Do(ctx, req)
 	cancel()
 	switch err {

+ 10 - 0
etcdserver/server_test.go

@@ -517,6 +517,7 @@ func TestDoProposal(t *testing.T) {
 	for i, tt := range tests {
 		st := &storeRecorder{}
 		srv := &EtcdServer{
+			cfg: &ServerConfig{TickMs: 1},
 			r: raftNode{
 				Node:        newNodeCommitter(),
 				storage:     &storageRecorder{},
@@ -547,6 +548,7 @@ func TestDoProposal(t *testing.T) {
 func TestDoProposalCancelled(t *testing.T) {
 	wait := &waitRecorder{}
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        wait,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -566,6 +568,7 @@ func TestDoProposalCancelled(t *testing.T) {
 
 func TestDoProposalTimeout(t *testing.T) {
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -579,6 +582,7 @@ func TestDoProposalTimeout(t *testing.T) {
 
 func TestDoProposalStopped(t *testing.T) {
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -653,6 +657,7 @@ func TestSyncTrigger(t *testing.T) {
 	n := newReadyNode()
 	st := make(chan time.Time, 1)
 	srv := &EtcdServer{
+		cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 			Node:        n,
 			raftStorage: raft.NewMemoryStorage(),
@@ -733,6 +738,7 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
+		cfg:       &ServerConfig{TickMs: 1},
 		snapCount: uint64(snapc),
 		r: raftNode{
 			Node:        newNodeCommitter(),
@@ -965,6 +971,7 @@ func TestPublish(t *testing.T) {
 	ch <- Response{}
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
+		cfg:        &ServerConfig{TickMs: 1},
 		id:         1,
 		r:          raftNode{Node: n},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@@ -1006,6 +1013,7 @@ func TestPublish(t *testing.T) {
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
+		cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 			Node:      &nodeRecorder{},
 			transport: &nopTransporter{},
@@ -1024,6 +1032,7 @@ func TestPublishStopped(t *testing.T) {
 func TestPublishRetry(t *testing.T) {
 	n := &nodeRecorder{}
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: n},
 		w:        &waitRecorder{},
 		done:     make(chan struct{}),
@@ -1047,6 +1056,7 @@ func TestUpdateVersion(t *testing.T) {
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
 		id:         1,
+		cfg:        &ServerConfig{TickMs: 1},
 		r:          raftNode{Node: n},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
 		cluster:    &cluster{},