Browse Source

Merge pull request #8718 from gyuho/qqq

clientv3: remove redundant retries in Lease, set FailFast=true
Gyu-Ho Lee 8 years ago
parent
commit
ad7882590c
2 changed files with 66 additions and 61 deletions
  1. 35 54
      clientv3/lease.go
  2. 31 7
      clientv3/retry.go

+ 35 - 54
clientv3/lease.go

@@ -22,7 +22,6 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 
-	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/metadata"
 )
 )
 
 
@@ -183,72 +182,55 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
 }
 }
 
 
 func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
 func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
-	for {
-		r := &pb.LeaseGrantRequest{TTL: ttl}
-		resp, err := l.remote.LeaseGrant(ctx, r)
-		if err == nil {
-			gresp := &LeaseGrantResponse{
-				ResponseHeader: resp.GetHeader(),
-				ID:             LeaseID(resp.ID),
-				TTL:            resp.TTL,
-				Error:          resp.Error,
-			}
-			return gresp, nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
+	r := &pb.LeaseGrantRequest{TTL: ttl}
+	resp, err := l.remote.LeaseGrant(ctx, r)
+	if err == nil {
+		gresp := &LeaseGrantResponse{
+			ResponseHeader: resp.GetHeader(),
+			ID:             LeaseID(resp.ID),
+			TTL:            resp.TTL,
+			Error:          resp.Error,
 		}
 		}
+		return gresp, nil
 	}
 	}
+	return nil, toErr(ctx, err)
 }
 }
 
 
 func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
 func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
-	for {
-		r := &pb.LeaseRevokeRequest{ID: int64(id)}
-		resp, err := l.remote.LeaseRevoke(ctx, r)
-
-		if err == nil {
-			return (*LeaseRevokeResponse)(resp), nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
-		}
+	r := &pb.LeaseRevokeRequest{ID: int64(id)}
+	resp, err := l.remote.LeaseRevoke(ctx, r)
+	if err == nil {
+		return (*LeaseRevokeResponse)(resp), nil
 	}
 	}
+	return nil, toErr(ctx, err)
 }
 }
 
 
 func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
 func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
-	for {
-		r := toLeaseTimeToLiveRequest(id, opts...)
-		resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false))
-		if err == nil {
-			gresp := &LeaseTimeToLiveResponse{
-				ResponseHeader: resp.GetHeader(),
-				ID:             LeaseID(resp.ID),
-				TTL:            resp.TTL,
-				GrantedTTL:     resp.GrantedTTL,
-				Keys:           resp.Keys,
-			}
-			return gresp, nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
+	r := toLeaseTimeToLiveRequest(id, opts...)
+	resp, err := l.remote.LeaseTimeToLive(ctx, r)
+	if err == nil {
+		gresp := &LeaseTimeToLiveResponse{
+			ResponseHeader: resp.GetHeader(),
+			ID:             LeaseID(resp.ID),
+			TTL:            resp.TTL,
+			GrantedTTL:     resp.GrantedTTL,
+			Keys:           resp.Keys,
 		}
 		}
+		return gresp, nil
 	}
 	}
+	return nil, toErr(ctx, err)
 }
 }
 
 
 func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
 func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
-	for {
-		resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false))
-		if err == nil {
-			leases := make([]LeaseStatus, len(resp.Leases))
-			for i := range resp.Leases {
-				leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
-			}
-			return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
-		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
+	resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{})
+	if err == nil {
+		leases := make([]LeaseStatus, len(resp.Leases))
+		for i := range resp.Leases {
+			leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
 		}
 		}
+		return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
 	}
 	}
+	return nil, toErr(ctx, err)
 }
 }
 
 
 func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
 func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
@@ -389,7 +371,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
 	cctx, cancel := context.WithCancel(ctx)
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 	defer cancel()
 
 
-	stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
+	stream, err := l.remote.LeaseKeepAlive(cctx)
 	if err != nil {
 	if err != nil {
 		return nil, toErr(ctx, err)
 		return nil, toErr(ctx, err)
 	}
 	}
@@ -433,7 +415,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 		} else {
 		} else {
 			for {
 			for {
 				resp, err := stream.Recv()
 				resp, err := stream.Recv()
-
 				if err != nil {
 				if err != nil {
 					if canceledByCaller(l.stopCtx, err) {
 					if canceledByCaller(l.stopCtx, err) {
 						return err
 						return err
@@ -461,7 +442,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 // resetRecv opens a new lease stream and starts sending keep alive requests.
 // resetRecv opens a new lease stream and starts sending keep alive requests.
 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 	sctx, cancel := context.WithCancel(l.stopCtx)
 	sctx, cancel := context.WithCancel(l.stopCtx)
-	stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
+	stream, err := l.remote.LeaseKeepAlive(sctx)
 	if err != nil {
 	if err != nil {
 		cancel()
 		cancel()
 		return nil, err
 		return nil, err

+ 31 - 7
clientv3/retry.go

@@ -164,11 +164,11 @@ func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionReq
 }
 }
 
 
 type retryLeaseClient struct {
 type retryLeaseClient struct {
-	pb.LeaseClient
-	readRetry retryRPCFunc
+	lc              pb.LeaseClient
+	repeatableRetry retryRPCFunc
 }
 }
 
 
-// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
+// RetryLeaseClient implements a LeaseClient.
 func RetryLeaseClient(c *Client) pb.LeaseClient {
 func RetryLeaseClient(c *Client) pb.LeaseClient {
 	retry := &retryLeaseClient{
 	retry := &retryLeaseClient{
 		pb.NewLeaseClient(c.conn),
 		pb.NewLeaseClient(c.conn),
@@ -177,9 +177,25 @@ func RetryLeaseClient(c *Client) pb.LeaseClient {
 	return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
 	return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
 }
 }
 
 
+func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
+	err = rlc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
+func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
+	err = rlc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rlc.lc.LeaseLeases(rctx, in, opts...)
+		return err
+	})
+	return resp, err
+}
+
 func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
 func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
-	err = rlc.readRetry(ctx, func(rctx context.Context) error {
-		resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...)
+	err = rlc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rlc.lc.LeaseGrant(rctx, in, opts...)
 		return err
 		return err
 	})
 	})
 	return resp, err
 	return resp, err
@@ -187,13 +203,21 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe
 }
 }
 
 
 func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
 func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
-	err = rlc.readRetry(ctx, func(rctx context.Context) error {
-		resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...)
+	err = rlc.repeatableRetry(ctx, func(rctx context.Context) error {
+		resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...)
 		return err
 		return err
 	})
 	})
 	return resp, err
 	return resp, err
 }
 }
 
 
+func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
+	err = rlc.repeatableRetry(ctx, func(rctx context.Context) error {
+		stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...)
+		return err
+	})
+	return stream, err
+}
+
 type retryClusterClient struct {
 type retryClusterClient struct {
 	pb.ClusterClient
 	pb.ClusterClient
 	writeRetry retryRPCFunc
 	writeRetry retryRPCFunc