|
|
@@ -21,7 +21,6 @@ import (
|
|
|
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
|
|
|
|
|
|
"go.uber.org/zap"
|
|
|
@@ -101,22 +100,19 @@ type leaseChecker struct {
|
|
|
lg *zap.Logger
|
|
|
m *rpcpb.Member
|
|
|
ls *leaseStresser
|
|
|
- lsc pb.LeaseClient
|
|
|
- kvc pb.KVClient
|
|
|
+ cli *clientv3.Client
|
|
|
}
|
|
|
|
|
|
func (lc *leaseChecker) Check() error {
|
|
|
- conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second))
|
|
|
+ cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
|
|
|
}
|
|
|
defer func() {
|
|
|
- if conn != nil {
|
|
|
- conn.Close()
|
|
|
+ if cli != nil {
|
|
|
+ cli.Close()
|
|
|
}
|
|
|
}()
|
|
|
- lc.kvc = pb.NewKVClient(conn)
|
|
|
- lc.lsc = pb.NewLeaseClient(conn)
|
|
|
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -148,7 +144,7 @@ func (lc *leaseChecker) checkShortLivedLeases() error {
|
|
|
|
|
|
func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
|
|
|
// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
|
|
|
- var resp *pb.LeaseTimeToLiveResponse
|
|
|
+ var resp *clientv3.LeaseTimeToLiveResponse
|
|
|
for i := 0; i < retries; i++ {
|
|
|
resp, err = lc.getLeaseByID(ctx, leaseID)
|
|
|
// lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
|
|
|
@@ -230,9 +226,13 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
|
|
|
- ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
|
|
|
- return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
|
|
|
+// TODO: handle failures from "grpc.FailFast(false)"
|
|
|
+func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
|
|
|
+ return lc.cli.TimeToLive(
|
|
|
+ ctx,
|
|
|
+ clientv3.LeaseID(leaseID),
|
|
|
+ clientv3.WithAttachedKeys(),
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
|
|
|
@@ -261,10 +261,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
|
|
|
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
|
|
|
// determines whether the attached keys for a given leaseID has been deleted or not
|
|
|
func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
|
|
|
- resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{
|
|
|
- Key: []byte(fmt.Sprintf("%d", leaseID)),
|
|
|
- RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
|
|
|
- }, grpc.FailFast(false))
|
|
|
+ resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
|
|
|
if err != nil {
|
|
|
lc.lg.Warn(
|
|
|
"hasKeysAttachedToLeaseExpired failed",
|