Browse Source

*: add max requests bytes, keepalive to server, blackhole methods to integration

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
939337f450

+ 47 - 20
embed/config.go

@@ -21,6 +21,7 @@ import (
 	"net/http"
 	"net/url"
 	"strings"
+	"time"
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/pkg/cors"
@@ -37,9 +38,13 @@ const (
 	ClusterStateFlagNew      = "new"
 	ClusterStateFlagExisting = "existing"
 
-	DefaultName         = "default"
-	DefaultMaxSnapshots = 5
-	DefaultMaxWALs      = 5
+	DefaultName                  = "default"
+	DefaultMaxSnapshots          = 5
+	DefaultMaxWALs               = 5
+	DefaultMaxRequestBytes       = 1.5 * 1024 * 1024
+	DefaultGRPCKeepAliveMinTime  = 5 * time.Second
+	DefaultGRPCKeepAliveInterval = 2 * time.Hour
+	DefaultGRPCKeepAliveTimeout  = 20 * time.Second
 
 	DefaultListenPeerURLs   = "http://localhost:2380"
 	DefaultListenClientURLs = "http://localhost:2379"
@@ -85,6 +90,24 @@ type Config struct {
 	TickMs            uint  `json:"heartbeat-interval"`
 	ElectionMs        uint  `json:"election-timeout"`
 	QuotaBackendBytes int64 `json:"quota-backend-bytes"`
+	MaxRequestBytes   uint  `json:"max-request-bytes"`
+
+	// gRPC server options
+
+	// GRPCKeepAliveMinTime is the minimum interval that a client should
+	// wait before pinging server. When client pings "too fast", server
+	// sends goaway and closes the connection (errors: too_many_pings,
+	// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
+	// Server expects client pings only when there is any active streams
+	// (PermitWithoutStream is set false).
+	GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
+	// GRPCKeepAliveInterval is the frequency of server-to-client ping
+	// to check if a connection is alive. Close a non-responsive connection
+	// after an additional duration of Timeout. 0 to disable.
+	GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
+	// GRPCKeepAliveTimeout is the additional duration of wait
+	// before closing a non-responsive connection. 0 to disable.
+	GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`
 
 	// clustering
 
@@ -167,23 +190,27 @@ func NewConfig() *Config {
 	lcurl, _ := url.Parse(DefaultListenClientURLs)
 	acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
 	cfg := &Config{
-		CorsInfo:            &cors.CORSInfo{},
-		MaxSnapFiles:        DefaultMaxSnapshots,
-		MaxWalFiles:         DefaultMaxWALs,
-		Name:                DefaultName,
-		SnapCount:           etcdserver.DefaultSnapCount,
-		TickMs:              100,
-		ElectionMs:          1000,
-		LPUrls:              []url.URL{*lpurl},
-		LCUrls:              []url.URL{*lcurl},
-		APUrls:              []url.URL{*apurl},
-		ACUrls:              []url.URL{*acurl},
-		ClusterState:        ClusterStateFlagNew,
-		InitialClusterToken: "etcd-cluster",
-		StrictReconfigCheck: true,
-		Metrics:             "basic",
-		EnableV2:            true,
-		AuthToken:           "simple",
+		CorsInfo:              &cors.CORSInfo{},
+		MaxSnapFiles:          DefaultMaxSnapshots,
+		MaxWalFiles:           DefaultMaxWALs,
+		Name:                  DefaultName,
+		SnapCount:             etcdserver.DefaultSnapCount,
+		MaxRequestBytes:       DefaultMaxRequestBytes,
+		GRPCKeepAliveMinTime:  DefaultGRPCKeepAliveMinTime,
+		GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
+		GRPCKeepAliveTimeout:  DefaultGRPCKeepAliveTimeout,
+		TickMs:                100,
+		ElectionMs:            1000,
+		LPUrls:                []url.URL{*lpurl},
+		LCUrls:                []url.URL{*lcurl},
+		APUrls:                []url.URL{*apurl},
+		ACUrls:                []url.URL{*acurl},
+		ClusterState:          ClusterStateFlagNew,
+		InitialClusterToken:   "etcd-cluster",
+		StrictReconfigCheck:   true,
+		Metrics:               "basic",
+		EnableV2:              true,
+		AuthToken:             "simple",
 	}
 	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
 	return cfg

+ 18 - 1
embed/etcd.go

@@ -36,6 +36,8 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/pkg/capnslog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 )
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
@@ -140,6 +142,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		ElectionTicks:           cfg.ElectionTicks(),
 		AutoCompactionRetention: cfg.AutoCompactionRetention,
 		QuotaBackendBytes:       cfg.QuotaBackendBytes,
+		MaxRequestBytes:         cfg.MaxRequestBytes,
 		StrictReconfigCheck:     cfg.StrictReconfigCheck,
 		ClientCertAuthEnabled:   cfg.ClientTLSInfo.ClientCertAuth,
 		AuthToken:               cfg.AuthToken,
@@ -415,9 +418,23 @@ func (e *Etcd) serve() (err error) {
 	}
 	h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
 
+	gopts := []grpc.ServerOption{}
+	if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
+		gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+			MinTime:             e.cfg.GRPCKeepAliveMinTime,
+			PermitWithoutStream: false,
+		}))
+	}
+	if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
+		e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
+		gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
+			Time:    e.cfg.GRPCKeepAliveInterval,
+			Timeout: e.cfg.GRPCKeepAliveTimeout,
+		}))
+	}
 	for _, sctx := range e.sctxs {
 		go func(s *serveCtx) {
-			e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler))
+			e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler, gopts...))
 		}(sctx)
 	}
 	return nil

+ 8 - 3
embed/serve.go

@@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
 // serve accepts incoming connections on the listener l,
 // creating a new service goroutine for each. The service goroutines
 // read requests and then call handler to reply to them.
-func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error {
+func (sctx *serveCtx) serve(
+	s *etcdserver.EtcdServer,
+	tlscfg *tls.Config,
+	handler http.Handler,
+	errHandler func(error),
+	gopts ...grpc.ServerOption) error {
 	logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
 	<-s.ReadyNotify()
 	plog.Info("ready to serve client requests")
@@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 	servLock := v3lock.NewLockServer(v3c)
 
 	if sctx.insecure {
-		gs := v3rpc.Server(s, nil)
+		gs := v3rpc.Server(s, nil, gopts...)
 		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)
@@ -107,7 +112,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
 	}
 
 	if sctx.secure {
-		gs := v3rpc.Server(s, tlscfg)
+		gs := v3rpc.Server(s, tlscfg, gopts...)
 		sctx.grpcServerC <- gs
 		v3electionpb.RegisterElectionServer(gs, servElection)
 		v3lockpb.RegisterLockServer(gs, servLock)

+ 4 - 0
etcdmain/config.go

@@ -138,6 +138,10 @@ func newConfig() *config {
 	fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
 	fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
 	fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
+	fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
+	fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
+	fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
+	fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")
 
 	// clustering
 	fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")

+ 8 - 0
etcdmain/help.go

@@ -66,6 +66,14 @@ member flags:
 		comma-separated whitelist of origins for CORS (cross-origin resource sharing).
 	--quota-backend-bytes '0'
 		raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
+	--max-request-bytes '1572864'
+		maximum client request size in bytes the server will accept.
+	--grpc-keepalive-min-time '5s'
+		minimum duration interval that a client should wait before pinging server.
+	--grpc-keepalive-interval '2h'
+		frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
+	--grpc-keepalive-timeout '20s'
+		additional duration of wait before closing a non-responsive connection (0 to disable).
 
 clustering flags:
 

+ 9 - 3
etcdserver/api/v3rpc/grpc.go

@@ -25,13 +25,17 @@ import (
 	"google.golang.org/grpc/grpclog"
 )
 
-const maxStreams = math.MaxUint32
+const (
+	grpcOverheadBytes = 512 * 1024
+	maxStreams        = math.MaxUint32
+	maxSendBytes      = math.MaxInt32
+)
 
 func init() {
 	grpclog.SetLogger(plog)
 }
 
-func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
+func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
 	var opts []grpc.ServerOption
 	opts = append(opts, grpc.CustomCodec(&codec{}))
 	if tls != nil {
@@ -39,8 +43,10 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
 	}
 	opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
 	opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
+	opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
+	opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
 	opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
-	grpcServer := grpc.NewServer(opts...)
+	grpcServer := grpc.NewServer(append(opts, gopts...)...)
 
 	pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
 	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))

+ 8 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -17,6 +17,7 @@ package rpctypes
 import (
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 var (
@@ -188,3 +189,10 @@ func Error(err error) error {
 	}
 	return EtcdError{code: grpc.Code(verr), desc: grpc.ErrorDesc(verr)}
 }
+
+func ErrorDesc(err error) string {
+	if s, ok := status.FromError(err); ok {
+		return s.Message()
+	}
+	return err.Error()
+}

+ 3 - 0
etcdserver/config.go

@@ -55,6 +55,9 @@ type ServerConfig struct {
 	AutoCompactionRetention int
 	QuotaBackendBytes       int64
 
+	// MaxRequestBytes is the maximum request size to send over raft.
+	MaxRequestBytes uint
+
 	StrictReconfigCheck bool
 
 	// ClientCertAuthEnabled is true when cert has been signed by the client CA.

+ 6 - 1
etcdserver/server.go

@@ -82,7 +82,8 @@ const (
 	releaseDelayAfterSnapshot = 30 * time.Second
 
 	// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
-	maxPendingRevokes = 16
+	maxPendingRevokes          = 16
+	recommendedMaxRequestBytes = 10 * 1024 * 1024
 )
 
 var (
@@ -259,6 +260,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 		cl *membership.RaftCluster
 	)
 
+	if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
+		plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
+	}
+
 	if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
 		return nil, fmt.Errorf("cannot access data directory: %v", terr)
 	}

+ 1 - 7
etcdserver/v3_server.go

@@ -33,12 +33,6 @@ import (
 )
 
 const (
-	// the max request size that raft accepts.
-	// TODO: make this a flag? But we probably do not want to
-	// accept large request which might block raft stream. User
-	// specify a large value might end up with shooting in the foot.
-	maxRequestBytes = 1.5 * 1024 * 1024
-
 	// In the health case, there might be a small gap (10s of entries) between
 	// the applied index and committed index.
 	// However, if the committed entries are very heavy to apply, the gap might grow.
@@ -556,7 +550,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
 		return nil, err
 	}
 
-	if len(data) > maxRequestBytes {
+	if len(data) > int(s.Cfg.MaxRequestBytes) {
 		return nil, ErrRequestTooLarge
 	}
 

+ 57 - 10
integration/bridge.go

@@ -17,6 +17,7 @@ package integration
 import (
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"sync"
 
@@ -31,9 +32,10 @@ type bridge struct {
 	l       net.Listener
 	conns   map[*bridgeConn]struct{}
 
-	stopc  chan struct{}
-	pausec chan struct{}
-	wg     sync.WaitGroup
+	stopc      chan struct{}
+	pausec     chan struct{}
+	blackholec chan struct{}
+	wg         sync.WaitGroup
 
 	mu sync.Mutex
 }
@@ -41,11 +43,12 @@ type bridge struct {
 func newBridge(addr string) (*bridge, error) {
 	b := &bridge{
 		// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
-		inaddr:  addr + "0",
-		outaddr: addr,
-		conns:   make(map[*bridgeConn]struct{}),
-		stopc:   make(chan struct{}),
-		pausec:  make(chan struct{}),
+		inaddr:     addr + "0",
+		outaddr:    addr,
+		conns:      make(map[*bridgeConn]struct{}),
+		stopc:      make(chan struct{}),
+		pausec:     make(chan struct{}),
+		blackholec: make(chan struct{}),
 	}
 	close(b.pausec)
 
@@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
-		io.Copy(bc.out, bc.in)
+		b.ioCopy(bc, bc.out, bc.in)
 		bc.close()
 		wg.Done()
 	}()
 	go func() {
-		io.Copy(bc.in, bc.out)
+		b.ioCopy(bc, bc.in, bc.out)
 		bc.close()
 		wg.Done()
 	}()
@@ -179,3 +182,47 @@ func (bc *bridgeConn) close() {
 	bc.in.Close()
 	bc.out.Close()
 }
+
+func (b *bridge) Blackhole() {
+	b.mu.Lock()
+	close(b.blackholec)
+	b.mu.Unlock()
+}
+
+func (b *bridge) Unblackhole() {
+	b.mu.Lock()
+	for bc := range b.conns {
+		bc.Close()
+	}
+	b.conns = make(map[*bridgeConn]struct{})
+	b.blackholec = make(chan struct{})
+	b.mu.Unlock()
+}
+
+// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer
+func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) {
+	buf := make([]byte, 32*1024)
+	for {
+		select {
+		case <-b.blackholec:
+			io.Copy(ioutil.Discard, src)
+			return nil
+		default:
+		}
+		nr, er := src.Read(buf)
+		if nr > 0 {
+			nw, ew := dst.Write(buf[0:nr])
+			if ew != nil {
+				return ew
+			}
+			if nr != nw {
+				return io.ErrShortWrite
+			}
+		}
+		if er != nil {
+			err = er
+			break
+		}
+	}
+	return
+}

+ 69 - 28
integration/cluster.go

@@ -31,11 +31,9 @@ import (
 	"testing"
 	"time"
 
-	"golang.org/x/net/context"
-	"google.golang.org/grpc"
-
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/api/v2http"
@@ -50,7 +48,11 @@ import (
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
+
 	"github.com/coreos/pkg/capnslog"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
 )
 
 const (
@@ -88,12 +90,18 @@ var (
 )
 
 type ClusterConfig struct {
-	Size              int
-	PeerTLS           *transport.TLSInfo
-	ClientTLS         *transport.TLSInfo
-	DiscoveryURL      string
-	UseGRPC           bool
-	QuotaBackendBytes int64
+	Size                  int
+	PeerTLS               *transport.TLSInfo
+	ClientTLS             *transport.TLSInfo
+	DiscoveryURL          string
+	UseGRPC               bool
+	QuotaBackendBytes     int64
+	MaxRequestBytes       uint
+	GRPCKeepAliveMinTime  time.Duration
+	GRPCKeepAliveInterval time.Duration
+	GRPCKeepAliveTimeout  time.Duration
+	// SkipCreatingClient to skip creating clients for each member.
+	SkipCreatingClient bool
 }
 
 type cluster struct {
@@ -221,10 +229,14 @@ func (c *cluster) HTTPMembers() []client.Member {
 func (c *cluster) mustNewMember(t *testing.T) *member {
 	m := mustNewMember(t,
 		memberConfig{
-			name:              c.name(rand.Int()),
-			peerTLS:           c.cfg.PeerTLS,
-			clientTLS:         c.cfg.ClientTLS,
-			quotaBackendBytes: c.cfg.QuotaBackendBytes,
+			name:                  c.name(rand.Int()),
+			peerTLS:               c.cfg.PeerTLS,
+			clientTLS:             c.cfg.ClientTLS,
+			quotaBackendBytes:     c.cfg.QuotaBackendBytes,
+			maxRequestBytes:       c.cfg.MaxRequestBytes,
+			grpcKeepAliveMinTime:  c.cfg.GRPCKeepAliveMinTime,
+			grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
+			grpcKeepAliveTimeout:  c.cfg.GRPCKeepAliveTimeout,
 		})
 	m.DiscoveryURL = c.cfg.DiscoveryURL
 	if c.cfg.UseGRPC {
@@ -474,9 +486,10 @@ type member struct {
 	s           *etcdserver.EtcdServer
 	hss         []*httptest.Server
 
-	grpcServer *grpc.Server
-	grpcAddr   string
-	grpcBridge *bridge
+	grpcServerOpts []grpc.ServerOption
+	grpcServer     *grpc.Server
+	grpcAddr       string
+	grpcBridge     *bridge
 
 	// serverClient is a clientv3 that directly calls the etcdserver.
 	serverClient *clientv3.Client
@@ -487,10 +500,14 @@ type member struct {
 func (m *member) GRPCAddr() string { return m.grpcAddr }
 
 type memberConfig struct {
-	name              string
-	peerTLS           *transport.TLSInfo
-	clientTLS         *transport.TLSInfo
-	quotaBackendBytes int64
+	name                  string
+	peerTLS               *transport.TLSInfo
+	clientTLS             *transport.TLSInfo
+	quotaBackendBytes     int64
+	maxRequestBytes       uint
+	grpcKeepAliveMinTime  time.Duration
+	grpcKeepAliveInterval time.Duration
+	grpcKeepAliveTimeout  time.Duration
 }
 
 // mustNewMember return an inited member with the given name. If peerTLS is
@@ -538,7 +555,26 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
 	m.ElectionTicks = electionTicks
 	m.TickMs = uint(tickDuration / time.Millisecond)
 	m.QuotaBackendBytes = mcfg.quotaBackendBytes
+	m.MaxRequestBytes = mcfg.maxRequestBytes
+	if m.MaxRequestBytes == 0 {
+		m.MaxRequestBytes = embed.DefaultMaxRequestBytes
+	}
 	m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough
+
+	m.grpcServerOpts = []grpc.ServerOption{}
+	if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
+		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+			MinTime:             mcfg.grpcKeepAliveMinTime,
+			PermitWithoutStream: false,
+		}))
+	}
+	if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
+		mcfg.grpcKeepAliveTimeout > time.Duration(0) {
+		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
+			Time:    mcfg.grpcKeepAliveInterval,
+			Timeout: mcfg.grpcKeepAliveTimeout,
+		}))
+	}
 	return m
 }
 
@@ -567,6 +603,8 @@ func (m *member) electionTimeout() time.Duration {
 func (m *member) DropConnections()    { m.grpcBridge.Reset() }
 func (m *member) PauseConnections()   { m.grpcBridge.Pause() }
 func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
+func (m *member) Blackhole()          { m.grpcBridge.Blackhole() }
+func (m *member) Unblackhole()        { m.grpcBridge.Unblackhole() }
 
 // NewClientV3 creates a new grpc client connection to the member
 func NewClientV3(m *member) (*clientv3.Client, error) {
@@ -676,7 +714,7 @@ func (m *member) Launch() error {
 				return err
 			}
 		}
-		m.grpcServer = v3rpc.Server(m.s, tlscfg)
+		m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
 		m.serverClient = v3client.New(m.s)
 		lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
 		epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
@@ -824,7 +862,7 @@ func (m *member) Metric(metricName string) (string, error) {
 }
 
 // InjectPartition drops connections from m to others, vice versa.
-func (m *member) InjectPartition(t *testing.T, others []*member) {
+func (m *member) InjectPartition(t *testing.T, others ...*member) {
 	for _, other := range others {
 		m.s.CutPeer(other.s.ID())
 		other.s.CutPeer(m.s.ID())
@@ -832,7 +870,7 @@ func (m *member) InjectPartition(t *testing.T, others []*member) {
 }
 
 // RecoverPartition recovers connections from m to others, vice versa.
-func (m *member) RecoverPartition(t *testing.T, others []*member) {
+func (m *member) RecoverPartition(t *testing.T, others ...*member) {
 	for _, other := range others {
 		m.s.MendPeer(other.s.ID())
 		other.s.MendPeer(m.s.ID())
@@ -884,12 +922,15 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
 		cluster: NewClusterByConfig(t, cfg),
 	}
 	clus.Launch(t)
-	for _, m := range clus.Members {
-		client, err := NewClientV3(m)
-		if err != nil {
-			t.Fatalf("cannot create client: %v", err)
+
+	if !cfg.SkipCreatingClient {
+		for _, m := range clus.Members {
+			client, err := NewClientV3(m)
+			if err != nil {
+				t.Fatalf("cannot create client: %v", err)
+			}
+			clus.clients = append(clus.clients, client)
 		}
-		clus.clients = append(clus.clients, client)
 	}
 
 	return clus

+ 2 - 2
integration/network_partition_test.go

@@ -149,12 +149,12 @@ func getMembersByIndexSlice(clus *cluster, idxs []int) []*member {
 
 func injectPartition(t *testing.T, src, others []*member) {
 	for _, m := range src {
-		m.InjectPartition(t, others)
+		m.InjectPartition(t, others...)
 	}
 }
 
 func recoverPartition(t *testing.T, src, others []*member) {
 	for _, m := range src {
-		m.RecoverPartition(t, others)
+		m.RecoverPartition(t, others...)
 	}
 }

+ 35 - 6
integration/v3_grpc_test.go

@@ -1372,7 +1372,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
 	// nil out TLS field so client will use an insecure connection
 	clus.Members[0].ClientTLSInfo = nil
 	client, err := NewClientV3(clus.Members[0])
-	if err != nil && err != grpc.ErrClientConnTimeout {
+	if err != nil && err != context.DeadlineExceeded {
 		t.Fatalf("unexpected error (%v)", err)
 	} else if client == nil {
 		// Ideally, no client would be returned. However, grpc will
@@ -1408,7 +1408,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
 	client, err := NewClientV3(clus.Members[0])
 	if client != nil || err == nil {
 		t.Fatalf("expected no client")
-	} else if err != grpc.ErrClientConnTimeout {
+	} else if err != context.DeadlineExceeded {
 		t.Fatalf("unexpected error (%v)", err)
 	}
 }
@@ -1565,8 +1565,8 @@ func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc
 	// 5. expect dial time-out when loading expired certs
 	select {
 	case gerr := <-errc:
-		if gerr != grpc.ErrClientConnTimeout {
-			t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr)
+		if gerr != context.DeadlineExceeded {
+			t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
 		}
 	case <-time.After(5 * time.Second):
 		t.Fatal("failed to receive dial timeout error")
@@ -1611,7 +1611,7 @@ func TestGRPCRequireLeader(t *testing.T) {
 	time.Sleep(time.Duration(3*electionTicks) * tickDuration)
 
 	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
-	ctx := metadata.NewContext(context.Background(), md)
+	ctx := metadata.NewOutgoingContext(context.Background(), md)
 	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
 	if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
 		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
@@ -1633,7 +1633,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
 
 	wAPI := toGRPC(client).Watch
 	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
-	ctx := metadata.NewContext(context.Background(), md)
+	ctx := metadata.NewOutgoingContext(context.Background(), md)
 	wStream, err := wAPI.Watch(ctx)
 	if err != nil {
 		t.Fatalf("wAPI.Watch error: %v", err)
@@ -1680,6 +1680,35 @@ func TestGRPCStreamRequireLeader(t *testing.T) {
 	}
 }
 
+// TestV3PutLargeRequests ensures that configurable MaxRequestBytes works as intended.
+func TestV3PutLargeRequests(t *testing.T) {
+	defer testutil.AfterTest(t)
+	tests := []struct {
+		key             string
+		maxRequestBytes uint
+		valueSize       int
+		expectError     error
+	}{
+		// don't set to 0. use 0 as the default.
+		{"foo", 1, 1024, rpctypes.ErrGRPCRequestTooLarge},
+		{"foo", 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
+		{"foo", 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
+		{"foo", 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
+	}
+	for i, test := range tests {
+		clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
+		kvcli := toGRPC(clus.Client(0)).KV
+		reqput := &pb.PutRequest{Key: []byte(test.key), Value: make([]byte, test.valueSize)}
+		_, err := kvcli.Put(context.TODO(), reqput)
+
+		if !eqErrGRPC(err, test.expectError) {
+			t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
+		}
+
+		clus.Terminate(t)
+	}
+}
+
 func eqErrGRPC(err1 error, err2 error) bool {
 	return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
 }