Browse Source

embed: Avoid panic when shutting down gRPC Server

Avoid panic when stopping gRPC Server if TLS configuration is present.
Provided solution (attempts to) implement suggestion from gRPC team: https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531.

Fixes #8916
James Phillips 8 years ago
parent
commit
e39915feec
2 changed files with 75 additions and 14 deletions
  1. 8 8
      embed/etcd.go
  2. 67 6
      embed/serve.go

+ 8 - 8
embed/etcd.go

@@ -100,9 +100,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 			return
 		}
 		if !serving {
-			// errored before starting gRPC server for serveCtx.grpcServerC
+			// errored before starting gRPC server for serveCtx
 			for _, sctx := range e.sctxs {
-				close(sctx.grpcServerC)
+				close(sctx.secureGrpcServerC)
+				close(sctx.insecureGrpcServerC)
 			}
 		}
 		e.Close()
@@ -222,15 +223,14 @@ func (e *Etcd) Config() Config {
 func (e *Etcd) Close() {
 	e.closeOnce.Do(func() { close(e.stopc) })
 
-	for _, sctx := range e.sctxs {
-		for gs := range sctx.grpcServerC {
-			e.stopGRPCServer(gs)
-		}
+	reqTimeout := 2 * time.Second
+	if e.Server != nil {
+		reqTimeout = e.Server.Cfg.ReqTimeout()
 	}
-
 	for _, sctx := range e.sctxs {
-		sctx.cancel()
+		teardownServeCtx(sctx, reqTimeout)
 	}
+
 	for i := range e.Clients {
 		if e.Clients[i] != nil {
 			e.Clients[i].Close()

+ 67 - 6
embed/serve.go

@@ -21,6 +21,7 @@ import (
 	"net"
 	"net/http"
 	"strings"
+	"time"
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3client"
@@ -54,13 +55,20 @@ type serveCtx struct {
 
 	userHandlers    map[string]http.Handler
 	serviceRegister func(*grpc.Server)
-	grpcServerC     chan *grpc.Server
+
+	secureHTTPServer    *http.Server
+	secureGrpcServerC   chan *grpc.Server
+	insecureGrpcServerC chan *grpc.Server
 }
 
 func newServeCtx() *serveCtx {
 	ctx, cancel := context.WithCancel(context.Background())
-	return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
-		grpcServerC: make(chan *grpc.Server, 2), // in case sctx.insecure,sctx.secure true
+	return &serveCtx{
+		ctx:                 ctx,
+		cancel:              cancel,
+		userHandlers:        make(map[string]http.Handler),
+		secureGrpcServerC:   make(chan *grpc.Server, 1),
+		insecureGrpcServerC: make(chan *grpc.Server, 1),
 	}
 }
 
@@ -84,7 +92,7 @@ func (sctx *serveCtx) serve(
 
 	if sctx.insecure {
 		gs := v3rpc.Server(s, nil, gopts...)
-		sctx.grpcServerC <- gs
+		sctx.insecureGrpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
@@ -118,7 +126,7 @@ func (sctx *serveCtx) serve(
 			return tlsErr
 		}
 		gs := v3rpc.Server(s, tlscfg, gopts...)
-		sctx.grpcServerC <- gs
+		sctx.secureGrpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
 		if sctx.serviceRegister != nil {
@@ -149,11 +157,13 @@ func (sctx *serveCtx) serve(
 			ErrorLog:  logger, // do not log user error
 		}
 		go func() { errHandler(srv.Serve(tlsl)) }()
+		sctx.secureHTTPServer = srv
 
 		plog.Infof("serving client requests on %s", sctx.l.Addr().String())
 	}
 
-	close(sctx.grpcServerC)
+	close(sctx.secureGrpcServerC)
+	close(sctx.insecureGrpcServerC)
 	return m.Serve()
 }
 
@@ -269,3 +279,54 @@ func (sctx *serveCtx) registerTrace() {
 	evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
 	sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
 }
+
+// Attempt to gracefully tear down gRPC server(s) and any associated mechanisms
+func teardownServeCtx(sctx *serveCtx, timeout time.Duration) {
+	if sctx.secure && len(sctx.secureGrpcServerC) > 0 {
+		gs := <-sctx.secureGrpcServerC
+		stopSecureServer(gs, sctx.secureHTTPServer, timeout)
+	}
+
+	if sctx.insecure && len(sctx.insecureGrpcServerC) > 0 {
+		gs := <-sctx.insecureGrpcServerC
+		stopInsecureServer(gs, timeout)
+	}
+
+	// Close any open gRPC connections
+	sctx.cancel()
+}
+
+// When using grpc's ServerHandlerTransport we are responsible for gracefully
+// stopping connections and shutting down.
+// https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
+func stopSecureServer(gs *grpc.Server, httpSrv *http.Server, timeout time.Duration) {
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+
+	// Stop accepting new connections await pending handlers
+	httpSrv.Shutdown(ctx)
+
+	// Teardown gRPC server
+	gs.Stop()
+}
+
+// Gracefully shutdown gRPC server when using HTTP2 transport.
+func stopInsecureServer(gs *grpc.Server, timeout time.Duration) {
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		// close listeners to stop accepting new connections,
+		// will block on any existing transports
+		gs.GracefulStop()
+	}()
+	// wait until all pending RPCs are finished
+	select {
+	case <-ch:
+	case <-time.After(timeout):
+		// took too long, manually close open transports
+		// e.g. watch streams
+		gs.Stop()
+		// concurrent GracefulStop should be interrupted
+		<-ch
+	}
+}