Browse Source

integration: configure keepalive parameters for server

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
044aca7f50
1 changed files with 47 additions and 21 deletions
  1. 47 21
      integration/cluster.go

+ 47 - 21
integration/cluster.go

@@ -53,6 +53,7 @@ import (
 	"github.com/coreos/pkg/capnslog"
 	"github.com/coreos/pkg/capnslog"
 	"github.com/soheilhy/cmux"
 	"github.com/soheilhy/cmux"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 )
 )
 
 
 const (
 const (
@@ -90,14 +91,17 @@ var (
 )
 )
 
 
 type ClusterConfig struct {
 type ClusterConfig struct {
-	Size              int
-	PeerTLS           *transport.TLSInfo
-	ClientTLS         *transport.TLSInfo
-	DiscoveryURL      string
-	UseGRPC           bool
-	QuotaBackendBytes int64
-	MaxTxnOps         uint
-	MaxRequestBytes   uint
+	Size                  int
+	PeerTLS               *transport.TLSInfo
+	ClientTLS             *transport.TLSInfo
+	DiscoveryURL          string
+	UseGRPC               bool
+	QuotaBackendBytes     int64
+	MaxTxnOps             uint
+	MaxRequestBytes       uint
+	GRPCKeepAliveMinTime  time.Duration
+	GRPCKeepAliveInterval time.Duration
+	GRPCKeepAliveTimeout  time.Duration
 }
 }
 
 
 type cluster struct {
 type cluster struct {
@@ -225,12 +229,15 @@ func (c *cluster) HTTPMembers() []client.Member {
 func (c *cluster) mustNewMember(t *testing.T) *member {
 func (c *cluster) mustNewMember(t *testing.T) *member {
 	m := mustNewMember(t,
 	m := mustNewMember(t,
 		memberConfig{
 		memberConfig{
-			name:              c.name(rand.Int()),
-			peerTLS:           c.cfg.PeerTLS,
-			clientTLS:         c.cfg.ClientTLS,
-			quotaBackendBytes: c.cfg.QuotaBackendBytes,
-			maxTxnOps:         c.cfg.MaxTxnOps,
-			maxRequestBytes:   c.cfg.MaxRequestBytes,
+			name:                  c.name(rand.Int()),
+			peerTLS:               c.cfg.PeerTLS,
+			clientTLS:             c.cfg.ClientTLS,
+			quotaBackendBytes:     c.cfg.QuotaBackendBytes,
+			maxTxnOps:             c.cfg.MaxTxnOps,
+			maxRequestBytes:       c.cfg.MaxRequestBytes,
+			grpcKeepAliveMinTime:  c.cfg.GRPCKeepAliveMinTime,
+			grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
+			grpcKeepAliveTimeout:  c.cfg.GRPCKeepAliveTimeout,
 		})
 		})
 	m.DiscoveryURL = c.cfg.DiscoveryURL
 	m.DiscoveryURL = c.cfg.DiscoveryURL
 	if c.cfg.UseGRPC {
 	if c.cfg.UseGRPC {
@@ -482,6 +489,7 @@ type member struct {
 	s             *etcdserver.EtcdServer
 	s             *etcdserver.EtcdServer
 	serverClosers []func()
 	serverClosers []func()
 
 
+	grpcServerOpts []grpc.ServerOption
 	grpcServer     *grpc.Server
 	grpcServer     *grpc.Server
 	grpcServerPeer *grpc.Server
 	grpcServerPeer *grpc.Server
 	grpcAddr       string
 	grpcAddr       string
@@ -496,12 +504,15 @@ type member struct {
 func (m *member) GRPCAddr() string { return m.grpcAddr }
 func (m *member) GRPCAddr() string { return m.grpcAddr }
 
 
 type memberConfig struct {
 type memberConfig struct {
-	name              string
-	peerTLS           *transport.TLSInfo
-	clientTLS         *transport.TLSInfo
-	quotaBackendBytes int64
-	maxTxnOps         uint
-	maxRequestBytes   uint
+	name                  string
+	peerTLS               *transport.TLSInfo
+	clientTLS             *transport.TLSInfo
+	quotaBackendBytes     int64
+	maxTxnOps             uint
+	maxRequestBytes       uint
+	grpcKeepAliveMinTime  time.Duration
+	grpcKeepAliveInterval time.Duration
+	grpcKeepAliveTimeout  time.Duration
 }
 }
 
 
 // mustNewMember return an inited member with the given name. If peerTLS is
 // mustNewMember return an inited member with the given name. If peerTLS is
@@ -558,6 +569,21 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
 		m.MaxRequestBytes = embed.DefaultMaxRequestBytes
 		m.MaxRequestBytes = embed.DefaultMaxRequestBytes
 	}
 	}
 	m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
 	m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
+
+	m.grpcServerOpts = []grpc.ServerOption{}
+	if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
+		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+			MinTime:             mcfg.grpcKeepAliveMinTime,
+			PermitWithoutStream: false,
+		}))
+	}
+	if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
+		mcfg.grpcKeepAliveTimeout > time.Duration(0) {
+		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
+			Time:    mcfg.grpcKeepAliveInterval,
+			Timeout: mcfg.grpcKeepAliveTimeout,
+		}))
+	}
 	return m
 	return m
 }
 }
 
 
@@ -672,7 +698,7 @@ func (m *member) Launch() error {
 				return err
 				return err
 			}
 			}
 		}
 		}
-		m.grpcServer = v3rpc.Server(m.s, tlscfg)
+		m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
 		m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
 		m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
 		m.serverClient = v3client.New(m.s)
 		m.serverClient = v3client.New(m.s)
 		lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
 		lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))