Browse Source

Merge pull request #8535 from gyuho/keepalive-server

*: configure server keepalive
Gyu-Ho Lee 8 years ago
parent
commit
10202a54ef
6 changed files with 82 additions and 30 deletions
  1. 47 24
      embed/config.go
  2. 16 1
      embed/etcd.go
  3. 8 3
      embed/serve.go
  4. 3 0
      etcdmain/config.go
  5. 6 0
      etcdmain/help.go
  6. 2 2
      etcdserver/api/v3rpc/grpc.go

+ 47 - 24
embed/config.go

@@ -39,11 +39,14 @@ const (
 	ClusterStateFlagNew      = "new"
 	ClusterStateFlagExisting = "existing"
 
-	DefaultName            = "default"
-	DefaultMaxSnapshots    = 5
-	DefaultMaxWALs         = 5
-	DefaultMaxTxnOps       = uint(128)
-	DefaultMaxRequestBytes = 1.5 * 1024 * 1024
+	DefaultName                  = "default"
+	DefaultMaxSnapshots          = 5
+	DefaultMaxWALs               = 5
+	DefaultMaxTxnOps             = uint(128)
+	DefaultMaxRequestBytes       = 1.5 * 1024 * 1024
+	DefaultGRPCKeepAliveMinTime  = 5 * time.Second
+	DefaultGRPCKeepAliveInterval = 2 * time.Hour
+	DefaultGRPCKeepAliveTimeout  = 20 * time.Second
 
 	DefaultListenPeerURLs   = "http://localhost:2380"
 	DefaultListenClientURLs = "http://localhost:2379"
@@ -93,6 +96,23 @@ type Config struct {
 	MaxTxnOps         uint  `json:"max-txn-ops"`
 	MaxRequestBytes   uint  `json:"max-request-bytes"`
 
+	// gRPC server options
+
+	// GRPCKeepAliveMinTime is the minimum interval that a client should
+	// wait before pinging server. When client pings "too fast", server
+	// sends goaway and closes the connection (errors: too_many_pings,
+	// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
+	// Server expects client pings only when there is any active streams
+	// (PermitWithoutStream is set false).
+	GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
+	// GRPCKeepAliveInterval is the frequency of server-to-client ping
+	// to check if a connection is alive. Close a non-responsive connection
+	// after an additional duration of Timeout. 0 to disable.
+	GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
+	// GRPCKeepAliveTimeout is the additional duration of wait
+	// before closing a non-responsive connection. 0 to disable.
+	GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`
+
 	// clustering
 
 	APUrls, ACUrls      []url.URL
@@ -181,25 +201,28 @@ func NewConfig() *Config {
 	lcurl, _ := url.Parse(DefaultListenClientURLs)
 	acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
 	cfg := &Config{
-		CorsInfo:            &cors.CORSInfo{},
-		MaxSnapFiles:        DefaultMaxSnapshots,
-		MaxWalFiles:         DefaultMaxWALs,
-		Name:                DefaultName,
-		SnapCount:           etcdserver.DefaultSnapCount,
-		MaxTxnOps:           DefaultMaxTxnOps,
-		MaxRequestBytes:     DefaultMaxRequestBytes,
-		TickMs:              100,
-		ElectionMs:          1000,
-		LPUrls:              []url.URL{*lpurl},
-		LCUrls:              []url.URL{*lcurl},
-		APUrls:              []url.URL{*apurl},
-		ACUrls:              []url.URL{*acurl},
-		ClusterState:        ClusterStateFlagNew,
-		InitialClusterToken: "etcd-cluster",
-		StrictReconfigCheck: true,
-		Metrics:             "basic",
-		EnableV2:            true,
-		AuthToken:           "simple",
+		CorsInfo:              &cors.CORSInfo{},
+		MaxSnapFiles:          DefaultMaxSnapshots,
+		MaxWalFiles:           DefaultMaxWALs,
+		Name:                  DefaultName,
+		SnapCount:             etcdserver.DefaultSnapCount,
+		MaxTxnOps:             DefaultMaxTxnOps,
+		MaxRequestBytes:       DefaultMaxRequestBytes,
+		GRPCKeepAliveMinTime:  DefaultGRPCKeepAliveMinTime,
+		GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
+		GRPCKeepAliveTimeout:  DefaultGRPCKeepAliveTimeout,
+		TickMs:                100,
+		ElectionMs:            1000,
+		LPUrls:                []url.URL{*lpurl},
+		LCUrls:                []url.URL{*lcurl},
+		APUrls:                []url.URL{*apurl},
+		ACUrls:                []url.URL{*acurl},
+		ClusterState:          ClusterStateFlagNew,
+		InitialClusterToken:   "etcd-cluster",
+		StrictReconfigCheck:   true,
+		Metrics:               "basic",
+		EnableV2:              true,
+		AuthToken:             "simple",
 	}
 	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
 	return cfg

+ 16 - 1
embed/etcd.go

@@ -42,6 +42,7 @@ import (
 
 	"github.com/soheilhy/cmux"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 )
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
@@ -424,9 +425,23 @@ func (e *Etcd) serve() (err error) {
 	}
 	h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
 
+	gopts := []grpc.ServerOption{}
+	if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
+		gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+			MinTime:             e.cfg.GRPCKeepAliveMinTime,
+			PermitWithoutStream: false,
+		}))
+	}
+	if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
+		e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
+		gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
+			Time:    e.cfg.GRPCKeepAliveInterval,
+			Timeout: e.cfg.GRPCKeepAliveTimeout,
+		}))
+	}
 	for _, sctx := range e.sctxs {
 		go func(s *serveCtx) {
-			e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler))
+			e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
 		}(sctx)
 	}
 

+ 8 - 3
embed/serve.go

@@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
 // serve accepts incoming connections on the listener l,
 // creating a new service goroutine for each. The service goroutines
 // read requests and then call handler to reply to them.
-func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error)) error {
+func (sctx *serveCtx) serve(
+	s *etcdserver.EtcdServer,
+	tlsinfo *transport.TLSInfo,
+	handler http.Handler,
+	errHandler func(error),
+	gopts ...grpc.ServerOption) error {
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 	<-s.ReadyNotify()
 	plog.Info("ready to serve client requests")
@@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
 	servLock := v3lock.NewLockServer(v3c)
 
 	if sctx.insecure {
-		gs := v3rpc.Server(s, nil)
+		gs := v3rpc.Server(s, nil, gopts...)
 		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
@@ -111,7 +116,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo
 		if tlsErr != nil {
 			return tlsErr
 		}
-		gs := v3rpc.Server(s, tlscfg)
+		gs := v3rpc.Server(s, tlscfg, gopts...)
 		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)

+ 3 - 0
etcdmain/config.go

@@ -143,6 +143,9 @@ func newConfig() *config {
 	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
 	fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.")
 	fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
+	fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
+	fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
+	fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
 
 	// clustering
 	fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")

+ 6 - 0
etcdmain/help.go

@@ -70,6 +70,12 @@ member flags:
 		maximum number of operations permitted in a transaction.
 	--max-request-bytes '1572864'
 		maximum client request size in bytes the server will accept.
+	--grpc-keepalive-min-time '5s'
+		minimum duration interval that a client should wait before pinging server.
+	--grpc-keepalive-interval '2h'
+		frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
+	--grpc-keepalive-timeout '20s'
+		additional duration of wait before closing a non-responsive connection (0 to disable).
 
 clustering flags:
 

+ 2 - 2
etcdserver/api/v3rpc/grpc.go

@@ -39,7 +39,7 @@ func init() {
 	grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
 }
 
-func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
+func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
 	var opts []grpc.ServerOption
 	opts = append(opts, grpc.CustomCodec(&codec{}))
 	if tls != nil {
@@ -50,7 +50,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
 	opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
 	opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
 	opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
-	grpcServer := grpc.NewServer(opts...)
+	grpcServer := grpc.NewServer(append(opts, gopts...)...)
 
 	pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
 	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))