Browse Source

Timeout refactor.

Ben Johnson 12 years ago
parent
commit
8442e7a0dc
7 changed files with 40 additions and 35 deletions
  1. 8 1
      etcd.go
  2. 2 2
      server/config.go
  3. 11 9
      server/peer_server.go
  4. 8 6
      server/timeout.go
  5. 6 14
      server/transporter.go
  6. 2 2
      server/usage.go
  7. 3 1
      tests/server_utils.go

+ 8 - 1
etcd.go

@@ -19,6 +19,7 @@ package main
 import (
 import (
 	"fmt"
 	"fmt"
 	"os"
 	"os"
+	"time"
 
 
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/server"
@@ -82,9 +83,15 @@ func main() {
 	registry := server.NewRegistry(store)
 	registry := server.NewRegistry(store)
 
 
 	// Create peer server.
 	// Create peer server.
-	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, config.HeartbeatTimeout, config.ElectionTimeout)
+	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
 	ps.RetryTimes = config.MaxRetryAttempts
+	if config.HeartbeatTimeout > 0 {
+		ps.HeartbeatTimeout = time.Duration(config.HeartbeatTimeout) * time.Millisecond
+	}
+	if config.ElectionTimeout > 0 {
+		ps.ElectionTimeout = time.Duration(config.ElectionTimeout) * time.Millisecond
+	}
 
 
 	// Create client server.
 	// Create client server.
 	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store)
 	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store)

+ 2 - 2
server/config.go

@@ -88,8 +88,8 @@ func NewConfig() *Config {
 	c.MaxRetryAttempts = 3
 	c.MaxRetryAttempts = 3
 	c.Peer.Addr = "127.0.0.1:7001"
 	c.Peer.Addr = "127.0.0.1:7001"
 	c.SnapshotCount = 10000
 	c.SnapshotCount = 10000
-	c.HeartbeatTimeout = HeartbeatTimeout
-	c.ElectionTimeout = ElectionTimeout
+	c.ElectionTimeout = 0
+	c.HeartbeatTimeout = 0
 	return c
 	return c
 }
 }
 
 

+ 11 - 9
server/peer_server.go

@@ -20,6 +20,8 @@ import (
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
 )
 )
 
 
+const retryInterval = 10
+
 type PeerServer struct {
 type PeerServer struct {
 	raftServer     raft.Server
 	raftServer     raft.Server
 	server         *Server
 	server         *Server
@@ -38,8 +40,8 @@ type PeerServer struct {
 	snapConf       *snapshotConf
 	snapConf       *snapshotConf
 	MaxClusterSize int
 	MaxClusterSize int
 	RetryTimes     int
 	RetryTimes     int
-	heartbeatTimeout int
-	electionTimeout  int
+	HeartbeatTimeout time.Duration
+	ElectionTimeout  time.Duration
 }
 }
 
 
 // TODO: find a good policy to do snapshot
 // TODO: find a good policy to do snapshot
@@ -55,7 +57,7 @@ type snapshotConf struct {
 	writesThr uint64
 	writesThr uint64
 }
 }
 
 
-func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout int, electionTimeout int) *PeerServer {
+func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int) *PeerServer {
 	s := &PeerServer{
 	s := &PeerServer{
 		name:     name,
 		name:     name,
 		url:      url,
 		url:      url,
@@ -78,8 +80,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 				back: -1,
 				back: -1,
 			},
 			},
 		},
 		},
-		heartbeatTimeout: heartbeatTimeout,
-		electionTimeout: electionTimeout,
+		HeartbeatTimeout: defaultHeartbeatTimeout,
+		ElectionTimeout: defaultElectionTimeout,
 	}
 	}
 
 
 	// Create transporter for raft
 	// Create transporter for raft
@@ -109,8 +111,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		}
 		}
 	}
 	}
 
 
-	s.raftServer.SetElectionTimeout(time.Duration(s.electionTimeout) * time.Millisecond)
-	s.raftServer.SetHeartbeatTimeout(time.Duration(s.heartbeatTimeout) * time.Millisecond)
+	s.raftServer.SetElectionTimeout(s.ElectionTimeout)
+	s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout)
 
 
 	s.raftServer.Start()
 	s.raftServer.Start()
 
 
@@ -232,8 +234,8 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 		if ok {
 		if ok {
 			return
 			return
 		}
 		}
-		log.Warnf("cannot join to cluster via given peers, retry in %d seconds", RetryInterval)
-		time.Sleep(time.Second * RetryInterval)
+		log.Warnf("cannot join to cluster via given peers, retry in %d seconds", retryInterval)
+		time.Sleep(time.Second * retryInterval)
 	}
 	}
 
 
 	log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)
 	log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)

+ 8 - 6
server/timeout.go

@@ -1,11 +1,13 @@
 package server
 package server
 
 
-const (
-	// The amount of time (ms) to elapse without a heartbeat before becoming a candidate.
-	ElectionTimeout = 200
+import (
+	"time"
+)
 
 
+const (
+	// The amount of time to elapse without a heartbeat before becoming a candidate
+	defaultElectionTimeout = 200 * time.Millisecond
+	
 	// The frequency by which heartbeats are sent to followers.
 	// The frequency by which heartbeats are sent to followers.
-	HeartbeatTimeout = 50
-
-	RetryInterval = 10
+	defaultHeartbeatTimeout = 50 * time.Millisecond
 )
 )

+ 6 - 14
server/transporter.go

@@ -18,7 +18,6 @@ type transporter struct {
 	client     *http.Client
 	client     *http.Client
 	transport  *http.Transport
 	transport  *http.Transport
 	peerServer *PeerServer
 	peerServer *PeerServer
-	tranTimeout time.Duration
 }
 }
 
 
 type dialer func(network, addr string) (net.Conn, error)
 type dialer func(network, addr string) (net.Conn, error)
@@ -28,15 +27,15 @@ type dialer func(network, addr string) (net.Conn, error)
 // whether the user give the server cert and key
 // whether the user give the server cert and key
 func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
 func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
 	// names for each type of timeout, for the sake of clarity
 	// names for each type of timeout, for the sake of clarity
-	dialTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
-	responseHeaderTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
+	dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
+	responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
 
 
 	t := transporter{}
 	t := transporter{}
 
 
-	t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond
-
 	tr := &http.Transport{
 	tr := &http.Transport{
-		Dial: dialWithTimeoutFactory(dialTimeout),
+		Dial: func(network, addr string) (net.Conn, error) {
+			return net.DialTimeout(network, addr, dialTimeout)
+		},
 		ResponseHeaderTimeout: responseHeaderTimeout,
 		ResponseHeaderTimeout: responseHeaderTimeout,
 	}
 	}
 
 
@@ -52,13 +51,6 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *
 	return &t
 	return &t
 }
 }
 
 
-// factory function to return a dialer
-func dialWithTimeoutFactory( timeout time.Duration ) dialer {
-	return func(network, addr string) (net.Conn, error) {
-		return net.DialTimeout(network, addr, timeout)
-	}
-}
-
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var b bytes.Buffer
 	var b bytes.Buffer
@@ -235,7 +227,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
 // Cancel the on fly HTTP transaction when timeout happens.
 // Cancel the on fly HTTP transaction when timeout happens.
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 	go func() {
 	go func() {
-		time.Sleep(t.tranTimeout)
+		time.Sleep(t.peerServer.HeartbeatTimeout)
 		t.transport.CancelRequest(req)
 		t.transport.CancelRequest(req)
 	}()
 	}()
 }
 }

+ 2 - 2
server/usage.go

@@ -44,9 +44,9 @@ Peer Communication Options:
   -peer-cert-file=<path>  Path to the peer cert file.
   -peer-cert-file=<path>  Path to the peer cert file.
   -peer-key-file=<path>   Path to the peer key file.
   -peer-key-file=<path>   Path to the peer key file.
   -peer-heartbeat-timeout=<time>
   -peer-heartbeat-timeout=<time>
-                          Time (in milliseconds) for a heartbeat to timeout
+                          Time (in milliseconds) for a heartbeat to timeout.
   -peer-election-timeout=<time>
   -peer-election-timeout=<time>
-                          Time (in milliseconds) for an election to timeout
+                          Time (in milliseconds) for an election to timeout.
 
 
 Other Options:
 Other Options:
   -max-result-buffer   Max size of the result buffer.
   -max-result-buffer   Max size of the result buffer.

+ 3 - 1
tests/server_utils.go

@@ -26,8 +26,10 @@ func RunServer(f func(*server.Server)) {
 	store := store.New()
 	store := store.New()
 	registry := server.NewRegistry(store)
 	registry := server.NewRegistry(store)
 
 
-	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout)
+	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
 	ps.MaxClusterSize = 9
 	ps.MaxClusterSize = 9
+	ps.ElectionTimeout = testElectionTimeout
+	ps.HeartbeatTimeout = testHeartbeatTimeout
 	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)
 	ps.SetServer(s)