Browse Source

refactor(raft): init raft transporter & server in main

Brian Waldon 12 years ago
parent
commit
a2ee620394
7 changed files with 58 additions and 40 deletions
  1. 21 3
      etcd.go
  2. 1 1
      server/listener.go
  3. 14 30
      server/peer_server.go
  4. 1 1
      server/raft_follower_stats.go
  5. 1 1
      server/raft_server_stats.go
  6. 1 1
      server/transporter.go
  7. 19 3
      tests/server_utils.go

+ 21 - 3
etcd.go

@@ -110,6 +110,16 @@ func main() {
 	store := store.New()
 	registry := server.NewRegistry(store)
 
+	// Create stats objects
+	followersStats := server.NewRaftFollowersStats(info.Name)
+	serverStats := server.NewRaftServerStats(info.Name)
+
+	// Calculate all of our timeouts
+	heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond
+	electionTimeout :=  time.Duration(config.Peer.ElectionTimeout) * time.Millisecond
+	dialTimeout := (3 * heartbeatTimeout) + electionTimeout
+	responseHeaderTimeout := (3 * heartbeatTimeout) + electionTimeout
+
 	// Create peer server.
 	psConfig := server.PeerServerConfig{
 		Name:             info.Name,
@@ -117,13 +127,11 @@ func main() {
 		Scheme:           peerTLSConfig.Scheme,
 		URL:              info.RaftURL,
 		SnapshotCount:    config.SnapshotCount,
-		HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond,
-		ElectionTimeout:  time.Duration(config.Peer.ElectionTimeout) * time.Millisecond,
 		MaxClusterSize:   config.MaxClusterSize,
 		RetryTimes:       config.MaxRetryAttempts,
 		CORS:             corsInfo,
 	}
-	ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb)
+	ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
 
 	var psListener net.Listener
 	if psConfig.Scheme == "https" {
@@ -135,6 +143,16 @@ func main() {
 		panic(err)
 	}
 
+	// Create Raft transporter and server
+	raftTransporter := server.NewTransporter(peerTLSConfig.Scheme, peerTLSConfig.Client, followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout)
+	raftServer, err := raft.NewServer(info.Name, config.DataDir, raftTransporter, store, ps, "")
+	if err != nil {
+		log.Fatal(err)
+	}
+	raftServer.SetElectionTimeout(electionTimeout)
+	raftServer.SetHeartbeatTimeout(heartbeatTimeout)
+	ps.SetRaftServer(raftServer)
+
 	// Create client server.
 	sConfig := server.ServerConfig{
 		Name:     info.Name,

+ 1 - 1
server/listener.go

@@ -16,7 +16,7 @@ func NewListener(addr string) (net.Listener, error) {
 	return l, nil
 }
 
-func NewTLSListener(addr, keyFile, certFile string) (net.Listener, error) {
+func NewTLSListener(addr, certFile, keyFile string) (net.Listener, error) {
 	if addr == "" {
 		addr = ":https"
 	}

+ 14 - 30
server/peer_server.go

@@ -43,8 +43,6 @@ type PeerServer struct {
 	raftServer     raft.Server
 	server         *Server
 	joinIndex      uint64
-	tlsConf        *TLSConfig
-	tlsInfo        *TLSInfo
 	followersStats *raftFollowersStats
 	serverStats    *raftServerStats
 	registry       *Registry
@@ -72,9 +70,7 @@ type snapshotConf struct {
 	snapshotThr uint64
 }
 
-func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer {
-	followersStats := newRaftFollowersStats(psConfig.Name)
-	serverStats := newRaftServerStats(psConfig.Name)
+func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
 	s := &PeerServer{
 		Config: psConfig,
 		registry: registry,
@@ -86,37 +82,28 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn
 
 		metrics: mb,
 	}
+	return s
+}
 
-	// Create transporter for raft
-	dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout
-	responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout
-	raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout)
-
-	// Create raft server
-	raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "")
-	if err != nil {
-		log.Fatal(err)
-	}
-
+func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
 	s.snapConf = &snapshotConf{
 		checkingInterval: time.Second * 3,
 		// this is not accurate, we will update raft to provide an api
 		lastIndex:   raftServer.CommitIndex(),
-		snapshotThr: uint64(psConfig.SnapshotCount),
+		snapshotThr: uint64(s.Config.SnapshotCount),
 	}
 
-	s.raftServer = raftServer
-	s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger)
-	s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger)
+	raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
 
-	s.raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
+	raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
 
-	return s
+	s.raftServer = raftServer
 }
 
 // Start the raft server
@@ -132,9 +119,6 @@ func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []strin
 		}
 	}
 
-	s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout)
-	s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout)
-
 	s.raftServer.Start()
 
 	if s.raftServer.IsLogEmpty() {

+ 1 - 1
server/raft_follower_stats.go

@@ -10,7 +10,7 @@ type raftFollowersStats struct {
 	Followers map[string]*raftFollowerStats `json:"followers"`
 }
 
-func newRaftFollowersStats(name string) *raftFollowersStats {
+func NewRaftFollowersStats(name string) *raftFollowersStats {
 	return &raftFollowersStats{
 		Leader:    name,
 		Followers: make(map[string]*raftFollowerStats),

+ 1 - 1
server/raft_server_stats.go

@@ -29,7 +29,7 @@ type raftServerStats struct {
 	recvRateQueue *statsQueue
 }
 
-func newRaftServerStats(name string) *raftServerStats {
+func NewRaftServerStats(name string) *raftServerStats {
 	return &raftServerStats{
 		Name:      name,
 		StartTime: time.Now(),

+ 1 - 1
server/transporter.go

@@ -29,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error)
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
-func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
+func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
 	tr := &http.Transport{
 		Dial: func(network, addr string) (net.Conn, error) {
 			return net.DialTimeout(network, addr, dialTimeout)

+ 19 - 3
tests/server_utils.go

@@ -5,6 +5,8 @@ import (
 	"os"
 	"time"
 
+	"github.com/coreos/raft"
+
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/store"
 )
@@ -27,23 +29,37 @@ func RunServer(f func(*server.Server)) {
 	registry := server.NewRegistry(store)
 	corsInfo, _ := server.NewCORSInfo([]string{})
 
+	serverStats := server.NewRaftServerStats(testName)
+	followersStats := server.NewRaftFollowersStats(testName)
+
 	psConfig := server.PeerServerConfig{
 		Name: testName,
 		Path: path,
 		URL: "http://"+testRaftURL,
 		Scheme: "http",
 		SnapshotCount: testSnapshotCount,
-		HeartbeatTimeout: testHeartbeatTimeout,
-		ElectionTimeout: testElectionTimeout,
 		MaxClusterSize: 9,
 		CORS: corsInfo,
 	}
-	ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil)
+	ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats)
 	psListener, err := server.NewListener(testRaftURL)
 	if err != nil {
 		panic(err)
 	}
 
+	// Create Raft transporter and server
+	tls := &server.TLSConfig{Scheme: "http"}
+	dialTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout
+	responseHeaderTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout
+	raftTransporter := server.NewTransporter(tls.Scheme, tls.Client, followersStats, serverStats, registry,	testHeartbeatTimeout, dialTimeout, responseHeaderTimeout)
+	raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "")
+	if err != nil {
+		panic(err)
+	}
+	raftServer.SetElectionTimeout(testElectionTimeout)
+	raftServer.SetHeartbeatTimeout(testHeartbeatTimeout)
+	ps.SetRaftServer(raftServer)
+
 	sConfig := server.ServerConfig{
 		Name: testName,
 		URL: "http://"+testClientURL,