|
|
@@ -18,6 +18,8 @@ import (
|
|
|
"fmt"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
"golang.org/x/net/context"
|
|
|
)
|
|
|
|
|
|
@@ -87,28 +89,83 @@ func (lc *leaseChecker) Check() error {
|
|
|
return err
|
|
|
}
|
|
|
plog.Infof("checking short lived leases %v", lc.ls.shortLivedLeases.leases)
|
|
|
- return lc.check(true, lc.ls.shortLivedLeases.leases)
|
|
|
+ return lc.checkShortLivedLeases()
|
|
|
+
|
|
|
}
|
|
|
|
|
|
-func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
|
|
|
+// checkShortLivedLeases ensures leases expire.
|
|
|
+func (lc *leaseChecker) checkShortLivedLeases() error {
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
|
|
|
+ errc := make(chan error)
|
|
|
defer cancel()
|
|
|
- for leaseID := range leases {
|
|
|
- keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
|
|
- if err != nil {
|
|
|
- plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
|
|
|
- return err
|
|
|
+ for leaseID := range lc.ls.shortLivedLeases.leases {
|
|
|
+ go func(id int64) {
|
|
|
+ errc <- lc.checkShortLivedLease(ctx, id)
|
|
|
+ }(leaseID)
|
|
|
+ }
|
|
|
+
|
|
|
+ var errs []error
|
|
|
+ for range lc.ls.shortLivedLeases.leases {
|
|
|
+ if err := <-errc; err != nil {
|
|
|
+ errs = append(errs, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return errsToError(errs)
|
|
|
+}
|
|
|
+
|
|
|
+func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
|
|
|
+ // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
|
|
|
+ var resp *pb.LeaseTimeToLiveResponse
|
|
|
+ for i := 0; i < retries; i++ {
|
|
|
+ resp, err = lc.ls.getLeaseByID(ctx, leaseID)
|
|
|
+ if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
|
|
|
+ return nil
|
|
|
}
|
|
|
- leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID)
|
|
|
if err != nil {
|
|
|
- plog.Errorf("hasLeaseExpired error: (%v)", err)
|
|
|
- return err
|
|
|
+ plog.Warningf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if resp.TTL > 0 {
|
|
|
+ plog.Warningf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL)
|
|
|
+ time.Sleep(time.Duration(resp.TTL) * time.Second)
|
|
|
+ } else {
|
|
|
+ plog.Warningf("retry %d. lease %v is expired but not yet revoked", i, leaseID)
|
|
|
+ time.Sleep(time.Second)
|
|
|
}
|
|
|
- if leaseExpired != keysExpired {
|
|
|
- return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
|
|
|
+ if err = lc.checkLease(ctx, false, leaseID); err != nil {
|
|
|
+ continue
|
|
|
}
|
|
|
- if leaseExpired != expired {
|
|
|
- return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
|
|
|
+ keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
|
|
+ if err != nil {
|
|
|
+ plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID)
|
|
|
+ if err != nil {
|
|
|
+ plog.Errorf("hasLeaseExpired error: (%v)", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if leaseExpired != keysExpired {
|
|
|
+ return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
|
|
|
+ }
|
|
|
+ if leaseExpired != expired {
|
|
|
+ return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
|
|
|
+ defer cancel()
|
|
|
+ for leaseID := range leases {
|
|
|
+ if err := lc.checkLease(ctx, expired, leaseID); err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
return nil
|