Browse Source

Merge pull request #651 from unihorn/48

fix(tests): pass all tests using latest raft
Yicheng Qin 11 years ago
parent
commit
4f1607b775
3 changed files with 35 additions and 11 deletions
  1. 12 3
      etcd.go
  2. 16 7
      server/listener.go
  3. 7 1
      server/peer_server.go

+ 12 - 3
etcd.go

@@ -149,18 +149,27 @@ func main() {
 	}
 
 	ps.SetServer(s)
-	ps.Start(config.Snapshot, config.Discovery, config.Peers)
+
+	// Generating config could be slow.
+	// Put it here to make listen happen immediately after peer-server starting.
+	peerTLSConfig := server.TLSServerConfig(config.PeerTLSInfo())
+	etcdTLSConfig := server.TLSServerConfig(config.EtcdTLSInfo())
 
 	go func() {
+		// Starting peer server should be followed close by listening on its port
+		// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
+		// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
+		// the cluster could be out of work as long as the two nodes cannot transfer messages.
+		ps.Start(config.Snapshot, config.Discovery, config.Peers)
 		log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, config.Peer.BindAddr, ps.Config.URL)
-		l := server.NewListener(psConfig.Scheme, config.Peer.BindAddr, config.PeerTLSInfo())
+		l := server.NewListener(psConfig.Scheme, config.Peer.BindAddr, peerTLSConfig)
 
 		sHTTP := &ehttp.CORSHandler{ps.HTTPHandler(), corsInfo}
 		log.Fatal(http.Serve(l, sHTTP))
 	}()
 
 	log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.Name, config.BindAddr, s.URL())
-	l := server.NewListener(config.EtcdTLSInfo().Scheme(), config.BindAddr, config.EtcdTLSInfo())
+	l := server.NewListener(config.EtcdTLSInfo().Scheme(), config.BindAddr, etcdTLSConfig)
 	sHTTP := &ehttp.CORSHandler{s.HTTPHandler(), corsInfo}
 	log.Fatal(http.Serve(l, sHTTP))
 }

+ 16 - 7
server/listener.go

@@ -7,16 +7,25 @@ import (
 	"github.com/coreos/etcd/log"
 )
 
+// TLSServerConfig generates tls configuration based on TLSInfo
+// If any error happens, this function will call log.Fatal
+func TLSServerConfig(info *TLSInfo) *tls.Config {
+	if info.KeyFile == "" || info.CertFile == "" {
+		return nil
+	}
+
+	cfg, err := info.ServerConfig()
+	if err != nil {
+		log.Fatal("TLS info error: ", err)
+	}
+	return cfg
+}
+
 // NewListener creates a net.Listener
-// If the given scheme is "https", it will generate TLS configuration based on TLSInfo.
+// If the given scheme is "https", it will use TLS config to set listener.
 // If any error happens, this function will call log.Fatal
-func NewListener(scheme, addr string, tlsInfo *TLSInfo) net.Listener {
+func NewListener(scheme, addr string, cfg *tls.Config) net.Listener {
 	if scheme == "https" {
-		cfg, err := tlsInfo.ServerConfig()
-		if err != nil {
-			log.Fatal("TLS info error: ", err)
-		}
-
 		l, err := newTLSListener(addr, cfg)
 		if err != nil {
 			log.Fatal("Failed to create TLS listener: ", err)

+ 7 - 1
server/peer_server.go

@@ -200,6 +200,8 @@ func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err er
 // 1. -discovery
 // 2. -peers
 // 3. previous peers in -data-dir
+// RaftServer should be started as late as possible. Current implementation
+// to start it is not that good, and will be refactored in #627.
 func (s *PeerServer) findCluster(discoverURL string, peers []string) {
 	// Attempt cluster discovery
 	toDiscover := discoverURL != ""
@@ -250,6 +252,7 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) {
 		if !ok {
 			log.Warn("No living peers are found!")
 		} else {
+			s.raftServer.Start()
 			log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name)
 			return
 		}
@@ -257,6 +260,7 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) {
 
 	if !s.raftServer.IsLogEmpty() {
 		log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name)
+		s.raftServer.Start()
 		return
 	}
 
@@ -285,7 +289,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
 		}
 	}
 
-	s.raftServer.Start()
+	s.raftServer.Init()
 
 	s.findCluster(discoverURL, peers)
 
@@ -351,6 +355,7 @@ func (s *PeerServer) SetServer(server *Server) {
 }
 
 func (s *PeerServer) startAsLeader() {
+	s.raftServer.Start()
 	// leader need to join self as a peer
 	for {
 		c := &JoinCommandV1{
@@ -373,6 +378,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 	for i := 0; i < s.Config.RetryTimes; i++ {
 		ok := s.joinCluster(cluster)
 		if ok {
+			s.raftServer.Start()
 			return
 		}
 		log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)