Browse Source

embed: only gracefully shutdown insecure grpc.Server

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 8 years ago
parent
commit
552b58dcfb
2 changed files with 57 additions and 85 deletions
  1. 44 16
      embed/etcd.go
  2. 13 69
      embed/serve.go

+ 44 - 16
embed/etcd.go

@@ -100,10 +100,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 			return
 		}
 		if !serving {
-			// errored before starting gRPC server for serveCtx
+			// errored before starting gRPC server for serveCtx.serversC
 			for _, sctx := range e.sctxs {
-				close(sctx.secureGrpcServerC)
-				close(sctx.insecureGrpcServerC)
+				close(sctx.serversC)
 			}
 		}
 		e.Close()
@@ -220,15 +219,27 @@ func (e *Etcd) Config() Config {
 	return e.cfg
 }
 
+// Close gracefully shuts down all servers/listeners.
+// Client requests will be terminated with request timeout.
+// After timeout, enforce remaning requests be closed immediately.
 func (e *Etcd) Close() {
 	e.closeOnce.Do(func() { close(e.stopc) })
 
-	reqTimeout := 2 * time.Second
+	// close client requests with request timeout
+	timeout := 2 * time.Second
 	if e.Server != nil {
-		reqTimeout = e.Server.Cfg.ReqTimeout()
+		timeout = e.Server.Cfg.ReqTimeout()
 	}
 	for _, sctx := range e.sctxs {
-		teardownServeCtx(sctx, reqTimeout)
+		for ss := range sctx.serversC {
+			ctx, cancel := context.WithTimeout(context.Background(), timeout)
+			stopServers(ctx, ss)
+			cancel()
+		}
+	}
+
+	for _, sctx := range e.sctxs {
+		sctx.cancel()
 	}
 
 	for i := range e.Clients {
@@ -236,6 +247,7 @@ func (e *Etcd) Close() {
 			e.Clients[i].Close()
 		}
 	}
+
 	for i := range e.metricsListeners {
 		e.metricsListeners[i].Close()
 	}
@@ -255,25 +267,38 @@ func (e *Etcd) Close() {
 	}
 }
 
-func (e *Etcd) stopGRPCServer(gs *grpc.Server) {
-	timeout := 2 * time.Second
-	if e.Server != nil {
-		timeout = e.Server.Cfg.ReqTimeout()
+func stopServers(ctx context.Context, ss *servers) {
+	shutdownNow := func() {
+		// first, close the http.Server
+		ss.http.Shutdown(ctx)
+		// then close grpc.Server; cancels all active RPCs
+		ss.grpc.Stop()
+	}
+
+	// do not grpc.Server.GracefulStop with TLS enabled etcd server
+	// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
+	// and https://github.com/coreos/etcd/issues/8916
+	if ss.secure {
+		shutdownNow()
+		return
 	}
+
 	ch := make(chan struct{})
 	go func() {
 		defer close(ch)
 		// close listeners to stop accepting new connections,
 		// will block on any existing transports
-		gs.GracefulStop()
+		ss.grpc.GracefulStop()
 	}()
+
 	// wait until all pending RPCs are finished
 	select {
 	case <-ch:
-	case <-time.After(timeout):
+	case <-ctx.Done():
 		// took too long, manually close open transports
 		// e.g. watch streams
-		gs.Stop()
+		shutdownNow()
+
 		// concurrent GracefulStop should be interrupted
 		<-ch
 	}
@@ -297,7 +322,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
 		for i := range peers {
 			if peers[i] != nil && peers[i].close != nil {
 				plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
-				peers[i].close(context.Background())
+				ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+				peers[i].close(ctx)
+				cancel()
 			}
 		}
 	}()
@@ -334,6 +361,7 @@ func (e *Etcd) servePeers() (err error) {
 			return err
 		}
 	}
+
 	for _, p := range e.Peers {
 		gs := v3rpc.Server(e.Server, peerTLScfg)
 		m := cmux.New(p.Listener)
@@ -349,8 +377,8 @@ func (e *Etcd) servePeers() (err error) {
 			// gracefully shutdown http.Server
 			// close open listeners, idle connections
 			// until context cancel or time-out
-			e.stopGRPCServer(gs)
-			return srv.Shutdown(ctx)
+			stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
+			return nil
 		}
 	}
 

+ 13 - 69
embed/serve.go

@@ -21,7 +21,6 @@ import (
 	"net"
 	"net/http"
 	"strings"
-	"time"
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3client"
@@ -55,20 +54,19 @@ type serveCtx struct {
 
 	userHandlers    map[string]http.Handler
 	serviceRegister func(*grpc.Server)
+	serversC        chan *servers
+}
 
-	secureHTTPServer    *http.Server
-	secureGrpcServerC   chan *grpc.Server
-	insecureGrpcServerC chan *grpc.Server
+type servers struct {
+	secure bool
+	grpc   *grpc.Server
+	http   *http.Server
 }
 
 func newServeCtx() *serveCtx {
 	ctx, cancel := context.WithCancel(context.Background())
-	return &serveCtx{
-		ctx:                 ctx,
-		cancel:              cancel,
-		userHandlers:        make(map[string]http.Handler),
-		secureGrpcServerC:   make(chan *grpc.Server, 1),
-		insecureGrpcServerC: make(chan *grpc.Server, 1),
+	return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
+		serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
 	}
 }
 
@@ -92,7 +90,6 @@ func (sctx *serveCtx) serve(
 
 	if sctx.insecure {
 		gs := v3rpc.Server(s, nil, gopts...)
-		sctx.insecureGrpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
@@ -101,9 +98,7 @@ func (sctx *serveCtx) serve(
 		grpcl := m.Match(cmux.HTTP2())
 		go func() { errHandler(gs.Serve(grpcl)) }()
 
-		opts := []grpc.DialOption{
-			grpc.WithInsecure(),
-		}
+		opts := []grpc.DialOption{grpc.WithInsecure()}
 		gwmux, err := sctx.registerGateway(opts)
 		if err != nil {
 			return err
@@ -117,6 +112,8 @@ func (sctx *serveCtx) serve(
 		}
 		httpl := m.Match(cmux.HTTP1())
 		go func() { errHandler(srvhttp.Serve(httpl)) }()
+
+		sctx.serversC <- &servers{grpc: gs, http: srvhttp}
 		plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
 	}
 
@@ -126,7 +123,6 @@ func (sctx *serveCtx) serve(
 			return tlsErr
 		}
 		gs := v3rpc.Server(s, tlscfg, gopts...)
-		sctx.secureGrpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
@@ -157,13 +153,12 @@ func (sctx *serveCtx) serve(
 			ErrorLog:  logger, // do not log user error
 		}
 		go func() { errHandler(srv.Serve(tlsl)) }()
-		sctx.secureHTTPServer = srv
 
+		sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
 		plog.Infof("serving client requests on %s", sctx.l.Addr().String())
 	}
 
-	close(sctx.secureGrpcServerC)
-	close(sctx.insecureGrpcServerC)
+	close(sctx.serversC)
 	return m.Serve()
 }
 
@@ -279,54 +274,3 @@ func (sctx *serveCtx) registerTrace() {
 	evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
 	sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
 }
-
-// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
-func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
-	if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
-		gs := <-sctx.secureGrpcServerC
-		stopSecureServer(gs, sctx.secureHTTPServer, timeout)
-	}
-
-	if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
-		gs := <-sctx.insecureGrpcServerC
-		stopInsecureServer(gs, timeout)
-	}
-
-	// Close any open gRPC connections
-	sctx.cancel()
-}
-
-// When using grpc's ServerHandlerTransport we are responsible for gracefully
-// stopping connections and shutting down.
-// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
-func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
-	ctx, cancel := context.WithTimeout(context.Background(), timeout)
-	defer cancel()
-
-	// Stop accepting new connections await pending handlers
-	httpSrv.Shutdown(ctx)
-
-	// Teardown gRPC server
-	gs.Stop()
-}
-
-// Gracefully shutdown gRPC server when using HTTP2 transport.
-func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
-	ch := make(chan struct{})
-	go func() {
-		defer close(ch)
-		// close listeners to stop accepting new connections,
-		// will block on any existing transports
-		gs.GracefulStop()
-	}()
-	// wait until all pending RPCs are finished
-	select {
-	case <-ch:
-	case <-time.After(timeout):
-		// took too long, manually close open transports
-		// e.g. watch streams
-		gs.Stop()
-		// concurrent GracefulStop should be interrupted
-		<-ch
-	}
-}