Browse Source

Added the ability to specify heartbeat and election timeouts as
config parameters.

Neil Dunbar 12 years ago
parent
commit
46f8a354d1
8 changed files with 39 additions and 31 deletions
  1. 1 1
      Dockerfile
  2. 1 1
      etcd.go
  3. 5 1
      server/config.go
  4. 7 3
      server/peer_server.go
  5. 1 0
      server/server.go
  6. 7 5
      server/timeout.go
  7. 13 20
      server/transporter.go
  8. 4 0
      server/usage.go

+ 1 - 1
Dockerfile

@@ -7,4 +7,4 @@ RUN apt-get install -y golang
 ADD . /opt/etcd
 RUN cd /opt/etcd && ./build
 EXPOSE 4001 7001
-ENTRYPOINT ["/opt/etcd/etcd", "-addr", "0.0.0.0:4001", "-bind-addr", "0.0.0.0:7001"]
+ENTRYPOINT ["/datastore/start.sh"]

+ 1 - 1
etcd.go

@@ -92,7 +92,7 @@ func main() {
 	registry := server.NewRegistry(store)
 
 	// Create peer server.
-	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
+	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, config.HeartbeatTimeout, config.ElectionTimeout)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
 

+ 5 - 1
server/config.go

@@ -67,7 +67,8 @@ type Config struct {
 	ShowVersion      bool
 	Verbose          bool `toml:"verbose" env:"ETCD_VERBOSE"`
 	VeryVerbose      bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
-
+	HeartbeatTimeout int
+	ElectionTimeout  int
 	Peer struct {
 		Addr     string `toml:"addr" env:"ETCD_PEER_ADDR"`
 		BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
@@ -228,6 +229,9 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
 	f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
 	f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
+	f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", HeartbeatTimeout, "")
+	f.IntVar(&c.ElectionTimeout, "peer-election-timeout", ElectionTimeout, "")
+
 	f.StringVar(&cors, "cors", "", "")
 
 	f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "")

+ 7 - 3
server/peer_server.go

@@ -38,6 +38,8 @@ type PeerServer struct {
 	snapConf       *snapshotConf
 	MaxClusterSize int
 	RetryTimes     int
+	heartbeatTimeout int
+	electionTimeout  int
 }
 
 // TODO: find a good policy to do snapshot
@@ -53,7 +55,7 @@ type snapshotConf struct {
 	writesThr uint64
 }
 
-func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
+func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout int, electionTimeout int) *PeerServer {
 	s := &PeerServer{
 		name:     name,
 		url:      url,
@@ -76,6 +78,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 				back: -1,
 			},
 		},
+		heartbeatTimeout: heartbeatTimeout,
+		electionTimeout: electionTimeout,
 	}
 
 	// Create transporter for raft
@@ -105,8 +109,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		}
 	}
 
-	s.raftServer.SetElectionTimeout(ElectionTimeout)
-	s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+	s.raftServer.SetElectionTimeout(time.Duration(s.electionTimeout) * time.Millisecond)
+	s.raftServer.SetHeartbeatTimeout(time.Duration(s.heartbeatTimeout) * time.Millisecond)
 
 	s.raftServer.Start()
 

+ 1 - 0
server/server.go

@@ -227,6 +227,7 @@ func (s *Server) listenAndServeTLS(certFile, keyFile string) error {
 
 	tlsListener := tls.NewListener(conn, config)
 	s.listener = tlsListener
+	log.Debugf("etcd listening using tls key %s, cert %s", keyFile, certFile)
 	return s.Server.Serve(tlsListener)
 }
 

+ 7 - 5
server/timeout.go

@@ -1,15 +1,17 @@
 package server
 
-import (
-	"time"
-)
+// import (
+// 	"time"
+// )
 
 const (
 	// The amount of time to elapse without a heartbeat before becoming a candidate.
-	ElectionTimeout = 200 * time.Millisecond
+	// ElectionTimeout = 200 * time.Millisecond
+	ElectionTimeout = 200
 
 	// The frequency by which heartbeats are sent to followers.
-	HeartbeatTimeout = 50 * time.Millisecond
+	//HeartbeatTimeout = 50 * time.Millisecond
+	HeartbeatTimeout = 50
 
 	RetryInterval = 10
 )

+ 13 - 20
server/transporter.go

@@ -13,36 +13,27 @@ import (
 	"github.com/coreos/raft"
 )
 
-// Timeout for setup internal raft http connection
-// This should not exceed 3 * RTT
-var dailTimeout = 3 * HeartbeatTimeout
-
-// Timeout for setup internal raft http connection + receive all post body
-// The raft server will not send back response header until it received all the
-// post body.
-// This should not exceed dailTimeout + electionTimeout
-var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout
-
-// Timeout for receiving the response body from the server
-// This should not exceed heartbeatTimeout
-var tranTimeout = HeartbeatTimeout
-
 // Transporter layer for communication between raft nodes
 type transporter struct {
 	client     *http.Client
 	transport  *http.Transport
 	peerServer *PeerServer
+	tranTimeout time.Duration
 }
 
+type dialer func(network, addr string) (net.Conn, error)
+
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
 func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
 	t := transporter{}
 
+	t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond
+
 	tr := &http.Transport{
-		Dial: dialWithTimeout,
-		ResponseHeaderTimeout: responseHeaderTimeout,
+		Dial: dialWithTimeoutFactory( time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond),
+		ResponseHeaderTimeout: time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond,
 	}
 
 	if scheme == "https" {
@@ -57,9 +48,11 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *
 	return &t
 }
 
-// Dial with timeout
-func dialWithTimeout(network, addr string) (net.Conn, error) {
-	return net.DialTimeout(network, addr, dailTimeout)
+// factory function to return a dialer
+func dialWithTimeoutFactory( timeout time.Duration ) dialer {
+	return func(network, addr string) (net.Conn, error) {
+		return net.DialTimeout(network, addr, timeout)
+	}
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
@@ -238,7 +231,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
 // Cancel the on fly HTTP transaction when timeout happens.
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 	go func() {
-		time.Sleep(tranTimeout)
+		time.Sleep(t.tranTimeout)
 		t.transport.CancelRequest(req)
 	}()
 }

+ 4 - 0
server/usage.go

@@ -43,6 +43,10 @@ Peer Communication Options:
   -peer-ca-file=<path>    Path to the peer CA file.
   -peer-cert-file=<path>  Path to the peer cert file.
   -peer-key-file=<path>   Path to the peer key file.
+  -peer-heartbeat-timeout=<time>
+                          Time (in milliseconds) for a heartbeat to timeout
+  -peer-election-timeout=<time>
+                          Time (in milliseconds) for an election to timeout
 
 Other Options:
   -max-result-buffer   Max size of the result buffer.