Browse Source

Merge pull request #8717 from gyuho/retry-cleanup

clientv3: clean up retry wrapper, remove all FailFast=false
Gyu-Ho Lee 8 years ago
parent
commit
40b6fcd761
7 changed files with 253 additions and 164 deletions
  1. 8 8
      clientv3/auth.go
  2. 9 18
      clientv3/cluster.go
  3. 2 21
      clientv3/kv.go
  4. 12 18
      clientv3/maintenance.go
  5. 216 77
      clientv3/retry.go
  6. 4 22
      clientv3/txn.go
  7. 2 0
      clientv3/watch.go

+ 8 - 8
clientv3/auth.go

@@ -105,16 +105,16 @@ type auth struct {
 }
 
 func NewAuth(c *Client) Auth {
-	return &auth{remote: pb.NewAuthClient(c.ActiveConnection())}
+	return &auth{remote: RetryAuthClient(c)}
 }
 
 func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
-	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
 	return (*AuthEnableResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
-	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
 	return (*AuthDisableResponse)(resp), toErr(ctx, err)
 }
 
@@ -139,12 +139,12 @@ func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (
 }
 
 func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
-	resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, grpc.FailFast(false))
+	resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name})
 	return (*AuthUserGetResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
-	resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{})
 	return (*AuthUserListResponse)(resp), toErr(ctx, err)
 }
 
@@ -169,12 +169,12 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran
 }
 
 func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
-	resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role})
 	return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
 }
 
 func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
-	resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, grpc.FailFast(false))
+	resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{})
 	return (*AuthRoleListResponse)(resp), toErr(ctx, err)
 }
 
@@ -202,7 +202,7 @@ type authenticator struct {
 }
 
 func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
-	resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, grpc.FailFast(false))
+	resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password})
 	return (*AuthenticateResponse)(resp), toErr(ctx, err)
 }
 

+ 9 - 18
clientv3/cluster.go

@@ -18,7 +18,6 @@ import (
 	"context"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"google.golang.org/grpc"
 )
 
 type (
@@ -75,27 +74,19 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
 
 func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
 	// it is safe to retry on update.
-	for {
-		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
-		resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
-		if err == nil {
-			return (*MemberUpdateResponse)(resp), nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
-		}
+	r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
+	resp, err := c.remote.MemberUpdate(ctx, r)
+	if err == nil {
+		return (*MemberUpdateResponse)(resp), nil
 	}
+	return nil, toErr(ctx, err)
 }
 
 func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
 	// it is safe to retry on list.
-	for {
-		resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, grpc.FailFast(false))
-		if err == nil {
-			return (*MemberListResponse)(resp), nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
-		}
+	resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{})
+	if err == nil {
+		return (*MemberListResponse)(resp), nil
 	}
+	return nil, toErr(ctx, err)
 }

+ 2 - 21
clientv3/kv.go

@@ -18,8 +18,6 @@ import (
 	"context"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
-	"google.golang.org/grpc"
 )
 
 type (
@@ -132,28 +130,11 @@ func (kv *kv) Txn(ctx context.Context) Txn {
 }
 
 func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
-	for {
-		resp, err := kv.do(ctx, op)
-		if err == nil {
-			return resp, nil
-		}
-
-		if isHaltErr(ctx, err) {
-			return resp, toErr(ctx, err)
-		}
-		// do not retry on modifications
-		if op.isWrite() {
-			return resp, toErr(ctx, err)
-		}
-	}
-}
-
-func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 	var err error
 	switch op.t {
 	case tRange:
 		var resp *pb.RangeResponse
-		resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false))
+		resp, err = kv.remote.Range(ctx, op.toRangeRequest())
 		if err == nil {
 			return OpResponse{get: (*GetResponse)(resp)}, nil
 		}
@@ -180,5 +161,5 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 	default:
 		panic("Unknown op")
 	}
-	return OpResponse{}, err
+	return OpResponse{}, toErr(ctx, err)
 }

+ 12 - 18
clientv3/maintenance.go

@@ -19,8 +19,6 @@ import (
 	"io"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
-	"google.golang.org/grpc"
 )
 
 type (
@@ -77,9 +75,9 @@ func NewMaintenance(c *Client) Maintenance {
 				return nil, nil, err
 			}
 			cancel := func() { conn.Close() }
-			return pb.NewMaintenanceClient(conn), cancel, nil
+			return RetryMaintenanceClient(c, conn), cancel, nil
 		},
-		remote: pb.NewMaintenanceClient(c.conn),
+		remote: RetryMaintenanceClient(c, c.conn),
 	}
 }
 
@@ -98,15 +96,11 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
 		MemberID: 0,                 // all
 		Alarm:    pb.AlarmType_NONE, // all
 	}
-	for {
-		resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
-		if err == nil {
-			return (*AlarmResponse)(resp), nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
-		}
+	resp, err := m.remote.Alarm(ctx, req)
+	if err == nil {
+		return (*AlarmResponse)(resp), nil
 	}
+	return nil, toErr(ctx, err)
 }
 
 func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
@@ -132,7 +126,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
 		return &ret, nil
 	}
 
-	resp, err := m.remote.Alarm(ctx, req, grpc.FailFast(false))
+	resp, err := m.remote.Alarm(ctx, req)
 	if err == nil {
 		return (*AlarmResponse)(resp), nil
 	}
@@ -145,7 +139,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, grpc.FailFast(false))
+	resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{})
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -158,7 +152,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.Status(ctx, &pb.StatusRequest{}, grpc.FailFast(false))
+	resp, err := remote.Status(ctx, &pb.StatusRequest{})
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -171,7 +165,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
 		return nil, toErr(ctx, err)
 	}
 	defer cancel()
-	resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false))
+	resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev})
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -179,7 +173,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
 }
 
 func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
-	ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
+	ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{})
 	if err != nil {
 		return nil, toErr(ctx, err)
 	}
@@ -206,6 +200,6 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
 }
 
 func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
-	resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false))
+	resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID})
 	return (*MoveLeaderResponse)(resp), toErr(ctx, err)
 }

+ 216 - 77
clientv3/retry.go

@@ -29,7 +29,7 @@ type rpcFunc func(ctx context.Context) error
 type retryRPCFunc func(context.Context, rpcFunc) error
 type retryStopErrFunc func(error) bool
 
-func isReadStopError(err error) bool {
+func isRepeatableStopError(err error) bool {
 	eErr := rpctypes.Error(err)
 	// always stop retry on etcd errors
 	if _, ok := eErr.(rpctypes.EtcdError); ok {
@@ -40,7 +40,7 @@ func isReadStopError(err error) bool {
 	return ev.Code() != codes.Unavailable
 }
 
-func isWriteStopError(err error) bool {
+func isNonRepeatableStopError(err error) bool {
 	ev, _ := status.FromError(err)
 	if ev.Code() != codes.Unavailable {
 		return true
@@ -101,63 +101,64 @@ func (c *Client) newAuthRetryWrapper() retryRPCFunc {
 	}
 }
 
-// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
+// RetryKVClient implements a KVClient.
 func RetryKVClient(c *Client) pb.KVClient {
-	readRetry := c.newRetryWrapper(isReadStopError)
-	writeRetry := c.newRetryWrapper(isWriteStopError)
+	repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
+	nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
 	conn := pb.NewKVClient(c.conn)
-	retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry}
+	retryBasic := &retryKVClient{&nonRepeatableKVClient{conn, nonRepeatableRetry}, repeatableRetry}
 	retryAuthWrapper := c.newAuthRetryWrapper()
 	return &retryKVClient{
-		&retryWriteKVClient{retryBasic, retryAuthWrapper},
+		&nonRepeatableKVClient{retryBasic, retryAuthWrapper},
 		retryAuthWrapper}
 }
 
 type retryKVClient struct {
-	*retryWriteKVClient
-	readRetry retryRPCFunc
+	*nonRepeatableKVClient
+	repeatableRetry retryRPCFunc
 }
 
 func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
-	err = rkv.readRetry(ctx, func(rctx context.Context) error {
-		resp, err = rkv.KVClient.Range(rctx, in, opts...)
+	err = rkv.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.kc.Range(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-type retryWriteKVClient struct {
-	pb.KVClient
-	writeRetry retryRPCFunc
+type nonRepeatableKVClient struct {
+	kc                 pb.KVClient
+	nonRepeatableRetry retryRPCFunc
 }
 
-func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
-	err = rkv.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rkv.KVClient.Put(rctx, in, opts...)
+func (rkv *nonRepeatableKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
+	err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.kc.Put(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
-	err = rkv.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...)
+func (rkv *nonRepeatableKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
+	err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.kc.DeleteRange(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
-	err = rkv.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rkv.KVClient.Txn(rctx, in, opts...)
+func (rkv *nonRepeatableKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
+	// TODO: repeatableRetry if read-only txn
+	err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.kc.Txn(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
-	err = rkv.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rkv.KVClient.Compact(rctx, in, opts...)
+func (rkv *nonRepeatableKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
+	err = rkv.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rkv.kc.Compact(rctx, in, opts...)
 		return err
 	})
 	return resp, err
@@ -172,7 +173,7 @@ type retryLeaseClient struct {
 func RetryLeaseClient(c *Client) pb.LeaseClient {
 	retry := &retryLeaseClient{
 		pb.NewLeaseClient(c.conn),
-		c.newRetryWrapper(isReadStopError),
+		c.newRetryWrapper(isRepeatableStopError),
 	}
 	return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
 }
@@ -219,132 +220,270 @@ func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.Ca
 }
 
 type retryClusterClient struct {
-	pb.ClusterClient
-	writeRetry retryRPCFunc
+	*nonRepeatableClusterClient
+	repeatableRetry retryRPCFunc
 }
 
-// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
+// RetryClusterClient implements a ClusterClient.
 func RetryClusterClient(c *Client) pb.ClusterClient {
-	return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)}
+	repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
+	nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
+	cc := pb.NewClusterClient(c.conn)
+	return &retryClusterClient{&nonRepeatableClusterClient{cc, nonRepeatableRetry}, repeatableRetry}
+}
+
+func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
+	err = rcc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rcc.cc.MemberList(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type nonRepeatableClusterClient struct {
+	cc                 pb.ClusterClient
+	nonRepeatableRetry retryRPCFunc
+}
+
+func (rcc *nonRepeatableClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
+	err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rcc.cc.MemberAdd(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rcc *nonRepeatableClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
+	err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rcc.cc.MemberRemove(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rcc *nonRepeatableClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
+	err = rcc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rcc.cc.MemberUpdate(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+// RetryMaintenanceClient implements a Maintenance.
+func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
+	repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
+	nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
+	mc := pb.NewMaintenanceClient(conn)
+	return &retryMaintenanceClient{&nonRepeatableMaintenanceClient{mc, nonRepeatableRetry}, repeatableRetry}
+}
+
+type retryMaintenanceClient struct {
+	*nonRepeatableMaintenanceClient
+	repeatableRetry retryRPCFunc
+}
+
+func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.Alarm(rctx, in, opts...)
+		return err
+	})
+	return resp, err
 }
 
-func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
-	err = rcc.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...)
+func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.Status(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
-	err = rcc.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...)
+func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.Hash(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
-	err = rcc.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...)
+func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.HashKV(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		stream, err = rmc.mc.Snapshot(rctx, in, opts...)
+		return err
+	})
+	return stream, err
+}
+
+func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
+	err = rmc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.MoveLeader(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type nonRepeatableMaintenanceClient struct {
+	mc                 pb.MaintenanceClient
+	nonRepeatableRetry retryRPCFunc
+}
+
+func (rmc *nonRepeatableMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
+	err = rmc.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rmc.mc.Defragment(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
 type retryAuthClient struct {
-	pb.AuthClient
-	writeRetry retryRPCFunc
+	*nonRepeatableAuthClient
+	repeatableRetry retryRPCFunc
 }
 
-// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
+// RetryAuthClient implements a AuthClient.
 func RetryAuthClient(c *Client) pb.AuthClient {
-	return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)}
+	repeatableRetry := c.newRetryWrapper(isRepeatableStopError)
+	nonRepeatableRetry := c.newRetryWrapper(isNonRepeatableStopError)
+	ac := pb.NewAuthClient(c.conn)
+	return &retryAuthClient{&nonRepeatableAuthClient{ac, nonRepeatableRetry}, repeatableRetry}
+}
+
+func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) {
+	err = rac.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserList(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) {
+	err = rac.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserGet(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) {
+	err = rac.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleGet(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) {
+	err = rac.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleList(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+type nonRepeatableAuthClient struct {
+	ac                 pb.AuthClient
+	nonRepeatableRetry retryRPCFunc
+}
+
+func (rac *nonRepeatableAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.AuthEnable(rctx, in, opts...)
+		return err
+	})
+	return resp, err
 }
 
-func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.AuthDisable(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserAdd(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.UserAdd(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserDelete(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.UserDelete(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserChangePassword(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserGrantRole(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.UserRevokeRole(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleAdd(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleDelete(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleGrantPermission(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.RoleRevokePermission(rctx, in, opts...)
 		return err
 	})
 	return resp, err
 }
 
-func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
-	err = rac.writeRetry(ctx, func(rctx context.Context) error {
-		resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...)
+func (rac *nonRepeatableAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) {
+	err = rac.nonRepeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rac.ac.Authenticate(rctx, in, opts...)
 		return err
 	})
 	return resp, err

+ 4 - 22
clientv3/txn.go

@@ -19,8 +19,6 @@ import (
 	"sync"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
-	"google.golang.org/grpc"
 )
 
 // Txn is the interface that wraps mini-transactions.
@@ -136,30 +134,14 @@ func (txn *txn) Else(ops ...Op) Txn {
 func (txn *txn) Commit() (*TxnResponse, error) {
 	txn.mu.Lock()
 	defer txn.mu.Unlock()
-	for {
-		resp, err := txn.commit()
-		if err == nil {
-			return resp, err
-		}
-		if isHaltErr(txn.ctx, err) {
-			return nil, toErr(txn.ctx, err)
-		}
-		if txn.isWrite {
-			return nil, toErr(txn.ctx, err)
-		}
-	}
-}
 
-func (txn *txn) commit() (*TxnResponse, error) {
 	r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
 
-	var opts []grpc.CallOption
-	if !txn.isWrite {
-		opts = []grpc.CallOption{grpc.FailFast(false)}
-	}
-	resp, err := txn.kv.remote.Txn(txn.ctx, r, opts...)
+	var resp *pb.TxnResponse
+	var err error
+	resp, err = txn.kv.remote.Txn(txn.ctx, r)
 	if err != nil {
-		return nil, err
+		return nil, toErr(txn.ctx, err)
 	}
 	return (*TxnResponse)(resp), nil
 }

+ 2 - 0
clientv3/watch.go

@@ -762,6 +762,8 @@ func (w *watchGrpcStream) joinSubstreams() {
 }
 
 // openWatchClient retries opening a watch client until success or halt.
+// manually retry in case "ws==nil && err==nil"
+// TODO: remove FailFast=false
 func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 	for {
 		select {