Browse Source

feat(main): add close and ready notifiers for etcd instance

Cenk Alti 11 years ago
parent
commit
399f491fd7
1 changed files with 38 additions and 2 deletions
  1. 38 2
      etcd/etcd.go

+ 38 - 2
etcd/etcd.go

@@ -22,6 +22,7 @@ import (
 	"os"
 	"path/filepath"
 	"runtime"
+	"strings"
 	"time"
 
 	goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
@@ -44,6 +45,8 @@ type Etcd struct {
 	PeerServer   *server.PeerServer // peer server, runs on 7001 by default
 	listener     net.Listener       // Listener for Server
 	peerListener net.Listener       // Listener for PeerServer
+	readyC       chan bool          // To signal when server is ready to accept connections
+	closeC       chan bool          // To signal when etcd is closed with Close() method
 }
 
 // New returns a new Etcd instance.
@@ -53,6 +56,8 @@ func New(c *config.Config) *Etcd {
 	}
 	return &Etcd{
 		Config: c,
+		readyC: make(chan bool),
+		closeC: make(chan bool),
 	}
 }
 
@@ -168,6 +173,11 @@ func (e *Etcd) Run() {
 		e.Server.EnableTracing()
 	}
 
+	// An error string equivalent to net.errClosing for using with
+	// http.Serve() during server shutdown. Need to re-declare
+	// here because it is not exported by "net" package.
+	const errClosing = "use of closed network connection"
+
 	e.PeerServer.SetServer(e.Server)
 
 	// Generating config could be slow.
@@ -175,6 +185,7 @@ func (e *Etcd) Run() {
 	peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
 	etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
 
+	peerServerClosed := make(chan bool)
 	go func() {
 		// Starting peer server should be followed close by listening on its port
 		// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
@@ -185,16 +196,41 @@ func (e *Etcd) Run() {
 		e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, e.Config.PeerTLSInfo())
 
 		sHTTP := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo}
-		log.Fatal(http.Serve(e.peerListener, sHTTP))
+		if err = http.Serve(e.peerListener, sHTTP); err != nil {
+			if !strings.Contains(err.Error(), errClosing) {
+				log.Fatal(err)
+			}
+		}
+		close(peerServerClosed)
 	}()
 
 	log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL())
 	e.listener = server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, e.Config.EtcdTLSInfo())
+
+	close(e.readyC) // etcd server is ready to accept connections, notify waiters.
+
 	sHTTP := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo}
-	log.Fatal(http.Serve(e.listener, sHTTP))
+	if err = http.Serve(e.listener, sHTTP); err != nil {
+		if !strings.Contains(err.Error(), errClosing) {
+			log.Fatal(err)
+		}
+	}
+
+	<-peerServerClosed
+	close(e.closeC) // etcd instance is closed, notify waiters.
+
+	log.Infof("etcd instance is closed [name %s]", e.Config.Name)
 }
 
 func (e *Etcd) Close() {
 	e.listener.Close()
 	e.peerListener.Close()
 }
+
+func (e *Etcd) CloseNotify() chan bool {
+	return e.closeC
+}
+
+func (e *Etcd) ReadyNotify() chan bool {
+	return e.readyC
+}