Browse Source

Merge branch 'feature-parametric-timeout' of https://github.com/neildunbar/etcd into neildunbar-feature-parametric-timeout

Ben Johnson 12 years ago
parent
commit
aabd0faebe
8 changed files with 51 additions and 40 deletions
  1. 1 0
      Dockerfile
  2. 1 1
      etcd.go
  3. 7 1
      server/config.go
  4. 7 3
      server/peer_server.go
  5. 3 7
      server/timeout.go
  6. 16 19
      server/transporter.go
  7. 8 4
      server/usage.go
  8. 8 5
      tests/server_utils.go

+ 1 - 0
Dockerfile

@@ -8,3 +8,4 @@ ADD . /opt/etcd
 RUN cd /opt/etcd && ./build
 EXPOSE 4001 7001
 ENTRYPOINT ["/opt/etcd/etcd"]
+

+ 1 - 1
etcd.go

@@ -82,7 +82,7 @@ func main() {
 	registry := server.NewRegistry(store)
 
 	// Create peer server.
-	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
+	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, config.HeartbeatTimeout, config.ElectionTimeout)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
 

+ 7 - 1
server/config.go

@@ -67,7 +67,8 @@ type Config struct {
 	ShowVersion      bool
 	Verbose          bool `toml:"verbose" env:"ETCD_VERBOSE"`
 	VeryVerbose      bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
-
+	HeartbeatTimeout int  `toml:"peer_heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
+	ElectionTimeout  int  `toml:"peer_election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
 	Peer struct {
 		Addr     string `toml:"addr" env:"ETCD_PEER_ADDR"`
 		BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
@@ -87,6 +88,8 @@ func NewConfig() *Config {
 	c.MaxRetryAttempts = 3
 	c.Peer.Addr = "127.0.0.1:7001"
 	c.SnapshotCount = 10000
+	c.HeartbeatTimeout = HeartbeatTimeout
+	c.ElectionTimeout = ElectionTimeout
 	return c
 }
 
@@ -233,6 +236,9 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
 	f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
 	f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
+	f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", c.HeartbeatTimeout, "")
+	f.IntVar(&c.ElectionTimeout, "peer-election-timeout", c.ElectionTimeout, "")
+
 	f.StringVar(&cors, "cors", "", "")
 
 	f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "")

+ 7 - 3
server/peer_server.go

@@ -38,6 +38,8 @@ type PeerServer struct {
 	snapConf       *snapshotConf
 	MaxClusterSize int
 	RetryTimes     int
+	heartbeatTimeout int
+	electionTimeout  int
 }
 
 // TODO: find a good policy to do snapshot
@@ -53,7 +55,7 @@ type snapshotConf struct {
 	writesThr uint64
 }
 
-func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
+func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout int, electionTimeout int) *PeerServer {
 	s := &PeerServer{
 		name:     name,
 		url:      url,
@@ -76,6 +78,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 				back: -1,
 			},
 		},
+		heartbeatTimeout: heartbeatTimeout,
+		electionTimeout: electionTimeout,
 	}
 
 	// Create transporter for raft
@@ -105,8 +109,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		}
 	}
 
-	s.raftServer.SetElectionTimeout(ElectionTimeout)
-	s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+	s.raftServer.SetElectionTimeout(time.Duration(s.electionTimeout) * time.Millisecond)
+	s.raftServer.SetHeartbeatTimeout(time.Duration(s.heartbeatTimeout) * time.Millisecond)
 
 	s.raftServer.Start()
 

+ 3 - 7
server/timeout.go

@@ -1,15 +1,11 @@
 package server
 
-import (
-	"time"
-)
-
 const (
-	// The amount of time to elapse without a heartbeat before becoming a candidate.
-	ElectionTimeout = 200 * time.Millisecond
+	// The amount of time (ms) to elapse without a heartbeat before becoming a candidate.
+	ElectionTimeout = 200
 
 	// The frequency by which heartbeats are sent to followers.
-	HeartbeatTimeout = 50 * time.Millisecond
+	HeartbeatTimeout = 50
 
 	RetryInterval = 10
 )

+ 16 - 19
server/transporter.go

@@ -13,35 +13,30 @@ import (
 	"github.com/coreos/raft"
 )
 
-// Timeout for setup internal raft http connection
-// This should not exceed 3 * RTT
-var dailTimeout = 3 * HeartbeatTimeout
-
-// Timeout for setup internal raft http connection + receive all post body
-// The raft server will not send back response header until it received all the
-// post body.
-// This should not exceed dailTimeout + electionTimeout
-var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout
-
-// Timeout for receiving the response body from the server
-// This should not exceed heartbeatTimeout
-var tranTimeout = HeartbeatTimeout
-
 // Transporter layer for communication between raft nodes
 type transporter struct {
 	client     *http.Client
 	transport  *http.Transport
 	peerServer *PeerServer
+	tranTimeout time.Duration
 }
 
+type dialer func(network, addr string) (net.Conn, error)
+
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
 func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
+	// names for each type of timeout, for the sake of clarity
+	dialTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
+	responseHeaderTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
+
 	t := transporter{}
 
+	t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond
+
 	tr := &http.Transport{
-		Dial: dialWithTimeout,
+		Dial: dialWithTimeoutFactory(dialTimeout),
 		ResponseHeaderTimeout: responseHeaderTimeout,
 	}
 
@@ -57,9 +52,11 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *
 	return &t
 }
 
-// Dial with timeout
-func dialWithTimeout(network, addr string) (net.Conn, error) {
-	return net.DialTimeout(network, addr, dailTimeout)
+// factory function to return a dialer
+func dialWithTimeoutFactory( timeout time.Duration ) dialer {
+	return func(network, addr string) (net.Conn, error) {
+		return net.DialTimeout(network, addr, timeout)
+	}
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
@@ -238,7 +235,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
 // Cancel the on fly HTTP transaction when timeout happens.
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 	go func() {
-		time.Sleep(tranTimeout)
+		time.Sleep(t.tranTimeout)
 		t.transport.CancelRequest(req)
 	}()
 }

+ 8 - 4
server/usage.go

@@ -38,11 +38,15 @@ Client Communication Options:
   -key-file=<path>          Path to the client key file.
 
 Peer Communication Options:
-  -peer-addr=<host:port>         The public host:port used for peer communication.
+  -peer-addr=<host:port>  The public host:port used for peer communication.
   -peer-bind-addr=<host[:port]>  The listening host:port used for peer communication.
-  -peer-ca-file=<path>           Path to the peer CA file.
-  -peer-cert-file=<path>         Path to the peer cert file.
-  -peer-key-file=<path>          Path to the peer key file.
+  -peer-ca-file=<path>    Path to the peer CA file.
+  -peer-cert-file=<path>  Path to the peer cert file.
+  -peer-key-file=<path>   Path to the peer key file.
+  -peer-heartbeat-timeout=<time>
+                          Time (in milliseconds) for a heartbeat to timeout
+  -peer-election-timeout=<time>
+                          Time (in milliseconds) for an election to timeout
 
 Other Options:
   -max-result-buffer   Max size of the result buffer.

+ 8 - 5
tests/server_utils.go

@@ -10,10 +10,12 @@ import (
 )
 
 const (
-	testName          = "ETCDTEST"
-	testClientURL     = "localhost:4401"
-	testRaftURL       = "localhost:7701"
-	testSnapshotCount = 10000
+	testName             = "ETCDTEST"
+	testClientURL        = "localhost:4401"
+	testRaftURL          = "localhost:7701"
+	testSnapshotCount    = 10000
+	testHeartbeatTimeout = 50
+	testElectionTimeout  = 200
 )
 
 // Starts a server in a temporary directory.
@@ -23,7 +25,8 @@ func RunServer(f func(*server.Server)) {
 
 	store := store.New()
 	registry := server.NewRegistry(store)
-	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
+
+	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout)
 	ps.MaxClusterSize = 9
 	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)