Browse Source

clientv3: use failfast and retry wrappers for at-most-once rpcs

Anthony Romano 9 years ago
parent
commit
3eadf964f4
8 changed files with 285 additions and 30 deletions
  1. 11 11
      clientv3/auth.go
  2. 6 0
      clientv3/balancer.go
  3. 14 7
      clientv3/client.go
  4. 4 4
      clientv3/cluster.go
  5. 3 3
      clientv3/kv.go
  6. 3 3
      clientv3/lease.go
  7. 243 0
      clientv3/retry.go
  8. 1 2
      clientv3/txn.go

+ 11 - 11
clientv3/auth.go

@@ -115,32 +115,32 @@ func NewAuth(c *Client) Auth {
 }
 }
 
 
 func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
 func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
-	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
 	return (*AuthEnableResponse)(resp), toErr(ctx, err)
 	return (*AuthEnableResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
 func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
-	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
 	return (*AuthDisableResponse)(resp), toErr(ctx, err)
 	return (*AuthDisableResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
 func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
-	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false))
+	resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
 	return (*AuthUserAddResponse)(resp), toErr(ctx, err)
 	return (*AuthUserAddResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
 func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
-	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false))
+	resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
 	return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
 	return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
 func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
-	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false))
+	resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
 	return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
 	return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
 func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
-	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false))
+	resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
 	return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
 	return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
 }
 }
 
 
@@ -155,12 +155,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
 }
 }
 
 
 func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
 func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
-	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false))
+	resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
 	return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
 	return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
 func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
-	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
 	return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
 	return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
 }
 }
 
 
@@ -170,7 +170,7 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
 		RangeEnd: []byte(rangeEnd),
 		RangeEnd: []byte(rangeEnd),
 		PermType: authpb.Permission_Type(permType),
 		PermType: authpb.Permission_Type(permType),
 	}
 	}
-	resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm})
 	return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
 	return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
 }
 }
 
 
@@ -185,12 +185,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
 }
 }
 
 
 func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
 func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
-	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
 	return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
 	return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
 }
 }
 
 
 func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
 func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
-	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
 	return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
 	return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
 }
 }
 
 

+ 6 - 0
clientv3/balancer.go

@@ -66,6 +66,12 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 
 
 func (b *simpleBalancer) Start(target string) error { return nil }
 func (b *simpleBalancer) Start(target string) error { return nil }
 
 
+func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	return b.upc
+}
+
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	b.mu.Lock()
 	b.mu.Lock()
 	if len(b.upEps) == 0 {
 	if len(b.upEps) == 0 {

+ 14 - 7
clientv3/client.go

@@ -46,9 +46,11 @@ type Client struct {
 	Auth
 	Auth
 	Maintenance
 	Maintenance
 
 
-	conn  *grpc.ClientConn
-	cfg   Config
-	creds *credentials.TransportCredentials
+	conn         *grpc.ClientConn
+	cfg          Config
+	creds        *credentials.TransportCredentials
+	balancer     *simpleBalancer
+	retryWrapper retryRpcFunc
 
 
 	ctx    context.Context
 	ctx    context.Context
 	cancel context.CancelFunc
 	cancel context.CancelFunc
@@ -239,12 +241,13 @@ func newClient(cfg *Config) (*Client, error) {
 		client.Password = cfg.Password
 		client.Password = cfg.Password
 	}
 	}
 
 
-	b := newSimpleBalancer(cfg.Endpoints)
-	conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(b))
+	client.balancer = newSimpleBalancer(cfg.Endpoints)
+	conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 	client.conn = conn
 	client.conn = conn
+	client.retryWrapper = client.newRetryWrapper()
 
 
 	// wait for a connection
 	// wait for a connection
 	if cfg.DialTimeout > 0 {
 	if cfg.DialTimeout > 0 {
@@ -296,8 +299,12 @@ func isHaltErr(ctx context.Context, err error) bool {
 		return eErr != rpctypes.ErrStopped && eErr != rpctypes.ErrNoLeader
 		return eErr != rpctypes.ErrStopped && eErr != rpctypes.ErrNoLeader
 	}
 	}
 	// treat etcdserver errors not recognized by the client as halting
 	// treat etcdserver errors not recognized by the client as halting
-	return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) ||
-		strings.Contains(err.Error(), "etcdserver:")
+	return isConnClosing(err) || strings.Contains(err.Error(), "etcdserver:")
+}
+
+// isConnClosing returns true if the error matches a grpc client closing error
+func isConnClosing(err error) bool {
+	return strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error())
 }
 }
 
 
 func toErr(ctx context.Context, err error) error {
 func toErr(ctx context.Context, err error) error {

+ 4 - 4
clientv3/cluster.go

@@ -47,12 +47,12 @@ type cluster struct {
 }
 }
 
 
 func NewCluster(c *Client) Cluster {
 func NewCluster(c *Client) Cluster {
-	return &cluster{remote: pb.NewClusterClient(c.conn)}
+	return &cluster{remote: RetryClusterClient(c)}
 }
 }
 
 
 func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
 func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
 	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
 	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
-	resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false))
+	resp, err := c.remote.MemberAdd(ctx, r)
 	if err == nil {
 	if err == nil {
 		return (*MemberAddResponse)(resp), nil
 		return (*MemberAddResponse)(resp), nil
 	}
 	}
@@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
 
 
 func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
 func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
 	r := &pb.MemberRemoveRequest{ID: id}
 	r := &pb.MemberRemoveRequest{ID: id}
-	resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false))
+	resp, err := c.remote.MemberRemove(ctx, r)
 	if err == nil {
 	if err == nil {
 		return (*MemberRemoveResponse)(resp), nil
 		return (*MemberRemoveResponse)(resp), nil
 	}
 	}
@@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
 	// it is safe to retry on update.
 	// it is safe to retry on update.
 	for {
 	for {
 		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
 		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
-		resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
+		resp, err := c.remote.MemberUpdate(ctx, r)
 		if err == nil {
 		if err == nil {
 			return (*MemberUpdateResponse)(resp), nil
 			return (*MemberUpdateResponse)(resp), nil
 		}
 		}

+ 3 - 3
clientv3/kv.go

@@ -82,7 +82,7 @@ type kv struct {
 }
 }
 
 
 func NewKV(c *Client) KV {
 func NewKV(c *Client) KV {
-	return &kv{remote: pb.NewKVClient(c.conn)}
+	return &kv{remote: RetryKVClient(c)}
 }
 }
 
 
 func NewKVFromKVClient(remote pb.KVClient) KV {
 func NewKVFromKVClient(remote pb.KVClient) KV {
@@ -162,14 +162,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 	case tPut:
 	case tPut:
 		var resp *pb.PutResponse
 		var resp *pb.PutResponse
 		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
 		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
-		resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false))
+		resp, err = kv.remote.Put(ctx, r)
 		if err == nil {
 		if err == nil {
 			return OpResponse{put: (*PutResponse)(resp)}, nil
 			return OpResponse{put: (*PutResponse)(resp)}, nil
 		}
 		}
 	case tDeleteRange:
 	case tDeleteRange:
 		var resp *pb.DeleteRangeResponse
 		var resp *pb.DeleteRangeResponse
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
-		resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false))
+		resp, err = kv.remote.DeleteRange(ctx, r)
 		if err == nil {
 		if err == nil {
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 		}
 		}

+ 3 - 3
clientv3/lease.go

@@ -110,7 +110,7 @@ func NewLease(c *Client) Lease {
 	l := &lessor{
 	l := &lessor{
 		donec:                 make(chan struct{}),
 		donec:                 make(chan struct{}),
 		keepAlives:            make(map[LeaseID]*keepAlive),
 		keepAlives:            make(map[LeaseID]*keepAlive),
-		remote:                pb.NewLeaseClient(c.conn),
+		remote:                RetryLeaseClient(c),
 		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
 		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
 	}
 	}
 	if l.firstKeepAliveTimeout == time.Second {
 	if l.firstKeepAliveTimeout == time.Second {
@@ -130,7 +130,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
 
 
 	for {
 	for {
 		r := &pb.LeaseGrantRequest{TTL: ttl}
 		r := &pb.LeaseGrantRequest{TTL: ttl}
-		resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false))
+		resp, err := l.remote.LeaseGrant(cctx, r)
 		if err == nil {
 		if err == nil {
 			gresp := &LeaseGrantResponse{
 			gresp := &LeaseGrantResponse{
 				ResponseHeader: resp.GetHeader(),
 				ResponseHeader: resp.GetHeader(),
@@ -156,7 +156,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
 
 
 	for {
 	for {
 		r := &pb.LeaseRevokeRequest{ID: int64(id)}
 		r := &pb.LeaseRevokeRequest{ID: int64(id)}
-		resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false))
+		resp, err := l.remote.LeaseRevoke(cctx, r)
 
 
 		if err == nil {
 		if err == nil {
 			return (*LeaseRevokeResponse)(resp), nil
 			return (*LeaseRevokeResponse)(resp), nil

+ 243 - 0
clientv3/retry.go

@@ -0,0 +1,243 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"golang.org/x/net/context"
+	"google.golang.org/grpc"
+)
+
+type rpcFunc func(ctx context.Context) error
+type retryRpcFunc func(context.Context, rpcFunc)
+
+func (c *Client) newRetryWrapper() retryRpcFunc {
+	return func(rpcCtx context.Context, f rpcFunc) {
+		for {
+			err := f(rpcCtx)
+			// ignore grpc conn closing on fail-fast calls; they are transient errors
+			if err == nil || !isConnClosing(err) {
+				return
+			}
+			select {
+			case <-c.balancer.ConnectNotify():
+			case <-rpcCtx.Done():
+			case <-c.ctx.Done():
+				return
+			}
+		}
+	}
+}
+
+type retryKVClient struct {
+	pb.KVClient
+	retryf retryRpcFunc
+}
+
+// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
+func RetryKVClient(c *Client) pb.KVClient {
+	return &retryKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
+}
+
+func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
+	rkv.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rkv.KVClient.Put(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
+	rkv.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
+	rkv.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rkv.KVClient.Txn(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
+	rkv.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rkv.KVClient.Compact(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type retryLeaseClient struct {
+	pb.LeaseClient
+	retryf retryRpcFunc
+}
+
+// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
+func RetryLeaseClient(c *Client) pb.LeaseClient {
+	return &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
+}
+
+func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
+	rlc.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+
+}
+
+func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
+	rlc.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type retryClusterClient struct {
+	pb.ClusterClient
+	retryf retryRpcFunc
+}
+
+// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
+func RetryClusterClient(c *Client) pb.ClusterClient {
+	return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper}
+}
+
+func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
+	rcc.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
+	rcc.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
+	rcc.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type retryAuthClient struct {
+	pb.AuthClient
+	retryf retryRpcFunc
+}
+
+// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
+func RetryAuthClient(c *Client) pb.AuthClient {
+	return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper}
+}
+
+func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.UserAdd(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.UserDelete(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
+	rac.retryf(ctx, func(rctx context.Context) error {
+		resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}

+ 1 - 2
clientv3/txn.go

@@ -19,7 +19,6 @@ import (
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
-	"google.golang.org/grpc"
 )
 )
 
 
 // Txn is the interface that wraps mini-transactions.
 // Txn is the interface that wraps mini-transactions.
@@ -153,7 +152,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 
 
 func (txn *txn) commit() (*TxnResponse, error) {
 func (txn *txn) commit() (*TxnResponse, error) {
 	r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
 	r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
-	resp, err := txn.kv.remote.Txn(txn.ctx, r, grpc.FailFast(false))
+	resp, err := txn.kv.remote.Txn(txn.ctx, r)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}