Forráskód Böngészése

clientv3: call other APIs with default gRPC call options

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 8 éve
szülő
commit
497412c588

+ 33 - 23
clientv3/auth.go

@@ -101,60 +101,65 @@ type Auth interface {
 }
 
 type auth struct {
-	remote pb.AuthClient
+	remote   pb.AuthClient
+	callOpts []grpc.CallOption
 }
 
 func NewAuth(c *Client) Auth {
-	return &auth{remote: RetryAuthClient(c)}
+	api := &auth{remote: RetryAuthClient(c)}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 
 func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
-	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
+	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
 	return (*AuthEnableResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
-	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
+	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
 	return (*AuthDisableResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
-	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
+	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
 	return (*AuthUserAddResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
-	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
+	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
 	return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
-	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
+	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
 	return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
-	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
+	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
 	return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
-	resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name})
+	resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
 	return (*AuthUserGetResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
-	resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{})
+	resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
 	return (*AuthUserListResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
-	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
+	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
 	return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
-	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
+	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
 	return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
 }
 
@@ -164,27 +169,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
 		RangeEnd: []byte(rangeEnd),
 		PermType: authpb.Permission_Type(permType),
 	}
-	resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm})
+	resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
 	return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
-	resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role})
+	resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
 	return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
-	resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{})
+	resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
 	return (*AuthRoleListResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
-	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
+	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
 	return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
-	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
+	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
 	return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
 }
 
@@ -197,12 +202,13 @@ func StrToPermissionType(s string) (PermissionType, error) {
 }
 
 type authenticator struct {
-	conn   *grpc.ClientConn // conn in-use
-	remote pb.AuthClient
+	conn     *grpc.ClientConn // conn in-use
+	remote   pb.AuthClient
+	callOpts []grpc.CallOption
 }
 
 func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
-	resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password})
+	resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
 	return (*AuthenticateResponse)(resp), toErr(ctx, err)
 }
 
@@ -210,14 +216,18 @@ func (auth *authenticator) close() {
 	auth.conn.Close()
 }
 
-func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) {
+func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
 	conn, err := grpc.Dial(endpoint, opts...)
 	if err != nil {
 		return nil, err
 	}
 
-	return &authenticator{
+	api := &authenticator{
 		conn:   conn,
 		remote: pb.NewAuthClient(conn),
-	}, nil
+	}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api, nil
 }

+ 1 - 1
clientv3/client.go

@@ -297,7 +297,7 @@ func (c *Client) getToken(ctx context.Context) error {
 		endpoint := c.cfg.Endpoints[i]
 		host := getHost(endpoint)
 		// use dial options without dopts to avoid reusing the client balancer
-		auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint))
+		auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c)
 		if err != nil {
 			continue
 		}

+ 19 - 8
clientv3/cluster.go

@@ -18,6 +18,8 @@ import (
 	"context"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
 )
 
 type (
@@ -43,20 +45,29 @@ type Cluster interface {
 }
 
 type cluster struct {
-	remote pb.ClusterClient
+	remote   pb.ClusterClient
+	callOpts []grpc.CallOption
 }
 
 func NewCluster(c *Client) Cluster {
-	return &cluster{remote: RetryClusterClient(c)}
+	api := &cluster{remote: RetryClusterClient(c)}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 
-func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster {
-	return &cluster{remote: remote}
+func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
+	api := &cluster{remote: remote}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 
 func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
 	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
-	resp, err := c.remote.MemberAdd(ctx, r)
+	resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -65,7 +76,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
 
 func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
 	r := &pb.MemberRemoveRequest{ID: id}
-	resp, err := c.remote.MemberRemove(ctx, r)
+	resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -75,7 +86,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
 func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
 	// it is safe to retry on update.
 	r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
-	resp, err := c.remote.MemberUpdate(ctx, r)
+	resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
 	if err == nil {
 		return (*MemberUpdateResponse)(resp), nil
 	}
@@ -84,7 +95,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
 
 func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
 	// it is safe to retry on list.
-	resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
+	resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
 	if err == nil {
 		return (*MemberListResponse)(resp), nil
 	}

+ 14 - 8
clientv3/lease.go

@@ -22,6 +22,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
+	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 )
 
@@ -166,6 +167,8 @@ type lessor struct {
 
 	// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
 	firstKeepAliveOnce sync.Once
+
+	callOpts []grpc.CallOption
 }
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -181,10 +184,10 @@ type keepAlive struct {
 }
 
 func NewLease(c *Client) Lease {
-	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
+	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
 }
 
-func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
 	l := &lessor{
 		donec:                 make(chan struct{}),
 		keepAlives:            make(map[LeaseID]*keepAlive),
@@ -194,6 +197,9 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
 	}
+	if c != nil {
+		l.callOpts = c.callOpts
+	}
 	reqLeaderCtx := WithRequireLeader(context.Background())
 	l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
 	return l
@@ -201,7 +207,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
 
 func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
 	r := &pb.LeaseGrantRequest{TTL: ttl}
-	resp, err := l.remote.LeaseGrant(ctx, r)
+	resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
 	if err == nil {
 		gresp := &LeaseGrantResponse{
 			ResponseHeader: resp.GetHeader(),
@@ -216,7 +222,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
 
 func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
 	r := &pb.LeaseRevokeRequest{ID: int64(id)}
-	resp, err := l.remote.LeaseRevoke(ctx, r)
+	resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
 	if err == nil {
 		return (*LeaseRevokeResponse)(resp), nil
 	}
@@ -225,7 +231,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
 
 func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
 	r := toLeaseTimeToLiveRequest(id, opts...)
-	resp, err := l.remote.LeaseTimeToLive(ctx, r)
+	resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
 	if err == nil {
 		gresp := &LeaseTimeToLiveResponse{
 			ResponseHeader: resp.GetHeader(),
@@ -240,7 +246,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
 }
 
 func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
-	resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{})
+	resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
 	if err == nil {
 		leases := make([]LeaseStatus, len(resp.Leases))
 		for i := range resp.Leases {
@@ -389,7 +395,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
-	stream, err := l.remote.LeaseKeepAlive(cctx)
+	stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -460,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 // resetRecv opens a new lease stream and starts sending keep alive requests.
 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 	sctx, cancel := context.WithCancel(l.stopCtx)
-	stream, err := l.remote.LeaseKeepAlive(sctx)
+	stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
 	if err != nil {
 		cancel()
 		return nil, err

+ 23 - 12
clientv3/maintenance.go

@@ -19,6 +19,8 @@ import (
 	"io"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
 )
 
 type (
@@ -63,12 +65,13 @@ type Maintenance interface {
 }
 
 type maintenance struct {
-	dial   func(endpoint string) (pb.MaintenanceClient, func(), error)
-	remote pb.MaintenanceClient
+	dial     func(endpoint string) (pb.MaintenanceClient, func(), error)
+	remote   pb.MaintenanceClient
+	callOpts []grpc.CallOption
 }
 
 func NewMaintenance(c *Client) Maintenance {
-	return &maintenance{
+	api := &maintenance{
 		dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
 			conn, err := c.dial(endpoint)
 			if err != nil {
@@ -79,15 +82,23 @@ func NewMaintenance(c *Client) Maintenance {
 		},
 		remote: RetryMaintenanceClient(c, c.conn),
 	}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 
-func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance {
-	return &maintenance{
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
+	api := &maintenance{
 		dial: func(string) (pb.MaintenanceClient, func(), error) {
 			return remote, func() {}, nil
 		},
 		remote: remote,
 	}
+	if c != nil {
+		api.callOpts = c.callOpts
+	}
+	return api
 }
 
 func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
@@ -96,7 +107,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
 		MemberID: 0,                 // all
 		Alarm:    pb.AlarmType_NONE, // all
 	}
-	resp, err := m.remote.Alarm(ctx, req)
+	resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
 	if err == nil {
 		return (*AlarmResponse)(resp), nil
 	}
@@ -126,7 +137,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 		return &ret, nil
 	}
 
-	resp, err := m.remote.Alarm(ctx, req)
+	resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
 	if err == nil {
 		return (*AlarmResponse)(resp), nil
 	}
@@ -139,7 +150,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
+	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -152,7 +163,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.Status(ctx, &pb.StatusRequest{})
+	resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -165,7 +176,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev})
+	resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -173,7 +184,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
 }
 
 func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
-	ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{})
+	ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -210,6 +221,6 @@ func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
 }
 
 func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
-	resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID})
+	resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
 	return (*MoveLeaderResponse)(resp), toErr(ctx, err)
 }

+ 14 - 7
clientv3/watch.go

@@ -106,7 +106,8 @@ func (wr *WatchResponse) IsProgressNotify() bool {
 
 // watcher implements the Watcher interface
 type watcher struct {
-	remote pb.WatchClient
+	remote   pb.WatchClient
+	callOpts []grpc.CallOption
 
 	// mu protects the grpc streams map
 	mu sync.RWMutex
@@ -117,8 +118,9 @@ type watcher struct {
 
 // watchGrpcStream tracks all watch resources attached to a single grpc stream.
 type watchGrpcStream struct {
-	owner  *watcher
-	remote pb.WatchClient
+	owner    *watcher
+	remote   pb.WatchClient
+	callOpts []grpc.CallOption
 
 	// ctx controls internal remote.Watch requests
 	ctx context.Context
@@ -189,14 +191,18 @@ type watcherStream struct {
 }
 
 func NewWatcher(c *Client) Watcher {
-	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn))
+	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
 }
 
-func NewWatchFromWatchClient(wc pb.WatchClient) Watcher {
-	return &watcher{
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
+	w := &watcher{
 		remote:  wc,
 		streams: make(map[string]*watchGrpcStream),
 	}
+	if c != nil {
+		w.callOpts = c.callOpts
+	}
+	return w
 }
 
 // never closes
@@ -215,6 +221,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
 	wgs := &watchGrpcStream{
 		owner:      w,
 		remote:     w.remote,
+		callOpts:   w.callOpts,
 		ctx:        ctx,
 		ctxKey:     streamKeyFromCtx(inctx),
 		cancel:     cancel,
@@ -775,7 +782,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error)
 			return nil, err
 		default:
 		}
-		if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil {
+		if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
 			break
 		}
 		if isHaltErr(w.ctx, err) {

+ 4 - 4
etcdserver/api/v3client/v3client.go

@@ -34,16 +34,16 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
 	c.KV = clientv3.NewKVFromKVClient(kvc, c)
 
 	lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
-	c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
+	c.Lease = clientv3.NewLeaseFromLeaseClient(lc, c, time.Second)
 
 	wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
-	c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc)}
+	c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}
 
 	mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
-	c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc)
+	c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)
 
 	clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
-	c.Cluster = clientv3.NewClusterFromClusterClient(clc)
+	c.Cluster = clientv3.NewClusterFromClusterClient(clc, c)
 
 	// TODO: implement clientv3.Auth interface?
 

+ 2 - 2
integration/cluster_proxy.go

@@ -102,9 +102,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
 	pmu.Lock()
 	lc := c.Lease
-	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
+	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)
 	c.Watcher = &proxyCloser{
-		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
+		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),
 		wdonec:  proxies[c].wdonec,
 		kvdonec: proxies[c].kvdonec,
 		lclose:  func() { lc.Close() },