Ver código fonte

Merge pull request #3190 from yichengq/adjust-prop-timeout

etcdserver: adjust proposal timeout based on config
Xiang Li 10 anos atrás
pai
commit
e894756144

+ 1 - 1
etcdmain/etcd.go

@@ -275,7 +275,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		plog.Infof("cors = %s", cfg.corsInfo)
 	}
 	ch := &cors.CORSHandler{
-		Handler: etcdhttp.NewClientHandler(s),
+		Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
 		Info:    cfg.corsInfo,
 	}
 	ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())

+ 16 - 0
etcdserver/config.go

@@ -20,6 +20,7 @@ import (
 	"path"
 	"reflect"
 	"sort"
+	"time"
 
 	"github.com/coreos/etcd/pkg/types"
 )
@@ -110,6 +111,21 @@ func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap"
 
 func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
 
+// ReqTimeout returns timeout for request to finish.
+func (c *ServerConfig) ReqTimeout() time.Duration {
+	// CommitTimeout
+	// + 2 * election timeout for possible leader election
+	return c.CommitTimeout() + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond
+}
+
+// CommitTimeout returns commit timeout under normal case.
+func (c *ServerConfig) CommitTimeout() time.Duration {
+	// We assume that heartbeat >= TTL.
+	// 5s for queue waiting, computation and disk IO delay
+	// + 2 * heartbeat(TTL) for expected time between proposal by follower and commit at the follower
+	return 5*time.Second + 2*time.Duration(c.TickMs)*time.Millisecond
+}
+
 func (c *ServerConfig) PrintWithInitial() { c.print(true) }
 
 func (c *ServerConfig) Print() { c.print(false) }

+ 6 - 4
etcdserver/etcdhttp/client.go

@@ -57,17 +57,17 @@ const (
 )
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
-func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
+func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
 	go capabilityLoop(server)
 
-	sec := auth.NewStore(server, defaultServerTimeout)
+	sec := auth.NewStore(server, timeout)
 
 	kh := &keysHandler{
 		sec:     sec,
 		server:  server,
 		cluster: server.Cluster(),
 		timer:   server,
-		timeout: defaultServerTimeout,
+		timeout: timeout,
 	}
 
 	sh := &statsHandler{
@@ -78,6 +78,7 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 		sec:     sec,
 		server:  server,
 		cluster: server.Cluster(),
+		timeout: timeout,
 		clock:   clockwork.NewRealClock(),
 	}
 
@@ -176,6 +177,7 @@ type membersHandler struct {
 	sec     auth.Store
 	server  etcdserver.Server
 	cluster etcdserver.Cluster
+	timeout time.Duration
 	clock   clockwork.Clock
 }
 
@@ -189,7 +191,7 @@ func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
 
-	ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
 	defer cancel()
 
 	switch r.Method {

+ 0 - 6
etcdserver/etcdhttp/http.go

@@ -28,12 +28,6 @@ import (
 )
 
 const (
-	// time to wait for response from EtcdServer requests
-	// 5s for disk and network delay + 10*heartbeat for commit and possible
-	// leader switch
-	// TODO: use heartbeat set in etcdserver
-	defaultServerTimeout = 5*time.Second + 10*(100*time.Millisecond)
-
 	// time to wait for a Watch request
 	defaultWatchTimeout = time.Duration(math.MaxInt64)
 )

+ 1 - 1
etcdserver/raft.go

@@ -175,7 +175,7 @@ func (r *raftNode) start(s *EtcdServer) {
 				}
 				r.Advance()
 			case <-syncC:
-				r.s.sync(defaultSyncTimeout)
+				r.s.sync(r.s.cfg.CommitTimeout())
 			case <-r.stopped:
 				return
 			}

+ 3 - 7
etcdserver/server.go

@@ -54,17 +54,13 @@ const (
 	// owner can make/remove files inside the directory
 	privateDirMode = 0700
 
-	defaultSyncTimeout = time.Second
-	DefaultSnapCount   = 10000
-	// TODO: calculate based on heartbeat interval
-	defaultPublishTimeout = 5 * time.Second
+	DefaultSnapCount = 10000
 
 	StoreClusterPrefix = "/0"
 	StoreKeysPrefix    = "/1"
 
 	purgeFileInterval      = 30 * time.Second
 	monitorVersionInterval = 5 * time.Second
-	versionUpdateTimeout   = 1 * time.Second
 )
 
 var (
@@ -347,7 +343,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 // It also starts a goroutine to publish its server information.
 func (s *EtcdServer) Start() {
 	s.start()
-	go s.publish(defaultPublishTimeout)
+	go s.publish(s.cfg.CommitTimeout())
 	go s.purgeFile()
 	go monitorFileDescriptor(s.done)
 	go s.monitorVersions()
@@ -1005,7 +1001,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
 		Path:   path.Join(StoreClusterPrefix, "version"),
 		Val:    ver,
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout)
+	ctx, cancel := context.WithTimeout(context.Background(), s.cfg.CommitTimeout())
 	_, err := s.Do(ctx, req)
 	cancel()
 	switch err {

+ 10 - 0
etcdserver/server_test.go

@@ -517,6 +517,7 @@ func TestDoProposal(t *testing.T) {
 	for i, tt := range tests {
 		st := &storeRecorder{}
 		srv := &EtcdServer{
+			cfg: &ServerConfig{TickMs: 1},
 			r: raftNode{
 				Node:        newNodeCommitter(),
 				storage:     &storageRecorder{},
@@ -547,6 +548,7 @@ func TestDoProposal(t *testing.T) {
 func TestDoProposalCancelled(t *testing.T) {
 	wait := &waitRecorder{}
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        wait,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -566,6 +568,7 @@ func TestDoProposalCancelled(t *testing.T) {
 
 func TestDoProposalTimeout(t *testing.T) {
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -579,6 +582,7 @@ func TestDoProposalTimeout(t *testing.T) {
 
 func TestDoProposalStopped(t *testing.T) {
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: &nodeRecorder{}},
 		w:        &waitRecorder{},
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -653,6 +657,7 @@ func TestSyncTrigger(t *testing.T) {
 	n := newReadyNode()
 	st := make(chan time.Time, 1)
 	srv := &EtcdServer{
+		cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 			Node:        n,
 			raftStorage: raft.NewMemoryStorage(),
@@ -733,6 +738,7 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
+		cfg:       &ServerConfig{TickMs: 1},
 		snapCount: uint64(snapc),
 		r: raftNode{
 			Node:        newNodeCommitter(),
@@ -965,6 +971,7 @@ func TestPublish(t *testing.T) {
 	ch <- Response{}
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
+		cfg:        &ServerConfig{TickMs: 1},
 		id:         1,
 		r:          raftNode{Node: n},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@@ -1006,6 +1013,7 @@ func TestPublish(t *testing.T) {
 // TestPublishStopped tests that publish will be stopped if server is stopped.
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
+		cfg: &ServerConfig{TickMs: 1},
 		r: raftNode{
 			Node:      &nodeRecorder{},
 			transport: &nopTransporter{},
@@ -1024,6 +1032,7 @@ func TestPublishStopped(t *testing.T) {
 func TestPublishRetry(t *testing.T) {
 	n := &nodeRecorder{}
 	srv := &EtcdServer{
+		cfg:      &ServerConfig{TickMs: 1},
 		r:        raftNode{Node: n},
 		w:        &waitRecorder{},
 		done:     make(chan struct{}),
@@ -1047,6 +1056,7 @@ func TestUpdateVersion(t *testing.T) {
 	w := &waitWithResponse{ch: ch}
 	srv := &EtcdServer{
 		id:         1,
+		cfg:        &ServerConfig{TickMs: 1},
 		r:          raftNode{Node: n},
 		attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
 		cluster:    &cluster{},

+ 1 - 1
integration/cluster_test.go

@@ -757,7 +757,7 @@ func (m *member) Launch() error {
 	for _, ln := range m.ClientListeners {
 		hs := &httptest.Server{
 			Listener: ln,
-			Config:   &http.Server{Handler: etcdhttp.NewClientHandler(m.s)},
+			Config:   &http.Server{Handler: etcdhttp.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
 		}
 		hs.Start()
 		m.hss = append(m.hss, hs)