checker_lease_expire.go 6.6 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "go.etcd.io/etcd/clientv3"
  20. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  21. "go.etcd.io/etcd/functional/rpcpb"
  22. "go.uber.org/zap"
  23. "google.golang.org/grpc"
  24. )
  25. type leaseExpireChecker struct {
  26. ctype rpcpb.Checker
  27. lg *zap.Logger
  28. m *rpcpb.Member
  29. ls *leaseStresser
  30. cli *clientv3.Client
  31. }
  32. func newLeaseExpireChecker(ls *leaseStresser) Checker {
  33. return &leaseExpireChecker{
  34. ctype: rpcpb.Checker_LEASE_EXPIRE,
  35. lg: ls.lg,
  36. m: ls.m,
  37. ls: ls,
  38. }
  39. }
  40. func (lc *leaseExpireChecker) Type() rpcpb.Checker {
  41. return lc.ctype
  42. }
  43. func (lc *leaseExpireChecker) EtcdClientEndpoints() []string {
  44. return []string{lc.m.EtcdClientEndpoint}
  45. }
  46. func (lc *leaseExpireChecker) Check() error {
  47. if lc.ls == nil {
  48. return nil
  49. }
  50. if lc.ls != nil &&
  51. (lc.ls.revokedLeases == nil ||
  52. lc.ls.aliveLeases == nil ||
  53. lc.ls.shortLivedLeases == nil) {
  54. return nil
  55. }
  56. cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
  57. if err != nil {
  58. return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
  59. }
  60. defer func() {
  61. if cli != nil {
  62. cli.Close()
  63. }
  64. }()
  65. lc.cli = cli
  66. if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
  67. return err
  68. }
  69. if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
  70. return err
  71. }
  72. return lc.checkShortLivedLeases()
  73. }
  74. const leaseExpireCheckerTimeout = 10 * time.Second
  75. // checkShortLivedLeases ensures leases expire.
  76. func (lc *leaseExpireChecker) checkShortLivedLeases() error {
  77. ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
  78. errc := make(chan error)
  79. defer cancel()
  80. for leaseID := range lc.ls.shortLivedLeases.leases {
  81. go func(id int64) {
  82. errc <- lc.checkShortLivedLease(ctx, id)
  83. }(leaseID)
  84. }
  85. var errs []error
  86. for range lc.ls.shortLivedLeases.leases {
  87. if err := <-errc; err != nil {
  88. errs = append(errs, err)
  89. }
  90. }
  91. return errsToError(errs)
  92. }
  93. func (lc *leaseExpireChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
  94. // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
  95. var resp *clientv3.LeaseTimeToLiveResponse
  96. for i := 0; i < retries; i++ {
  97. resp, err = lc.getLeaseByID(ctx, leaseID)
  98. // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
  99. if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
  100. return nil
  101. }
  102. if err != nil {
  103. lc.lg.Debug(
  104. "retrying; Lease TimeToLive failed",
  105. zap.Int("retries", i),
  106. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  107. zap.Error(err),
  108. )
  109. continue
  110. }
  111. if resp.TTL > 0 {
  112. dur := time.Duration(resp.TTL) * time.Second
  113. lc.lg.Debug(
  114. "lease has not been expired, wait until expire",
  115. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  116. zap.Int64("ttl", resp.TTL),
  117. zap.Duration("wait-duration", dur),
  118. )
  119. time.Sleep(dur)
  120. } else {
  121. lc.lg.Debug(
  122. "lease expired but not yet revoked",
  123. zap.Int("retries", i),
  124. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  125. zap.Int64("ttl", resp.TTL),
  126. zap.Duration("wait-duration", time.Second),
  127. )
  128. time.Sleep(time.Second)
  129. }
  130. if err = lc.checkLease(ctx, false, leaseID); err != nil {
  131. continue
  132. }
  133. return nil
  134. }
  135. return err
  136. }
  137. func (lc *leaseExpireChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
  138. keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
  139. if err != nil {
  140. lc.lg.Warn(
  141. "hasKeysAttachedToLeaseExpired failed",
  142. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  143. zap.Error(err),
  144. )
  145. return err
  146. }
  147. leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
  148. if err != nil {
  149. lc.lg.Warn(
  150. "hasLeaseExpired failed",
  151. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  152. zap.Error(err),
  153. )
  154. return err
  155. }
  156. if leaseExpired != keysExpired {
  157. return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
  158. }
  159. if leaseExpired != expired {
  160. return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
  161. }
  162. return nil
  163. }
  164. func (lc *leaseExpireChecker) check(expired bool, leases map[int64]time.Time) error {
  165. ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
  166. defer cancel()
  167. for leaseID := range leases {
  168. if err := lc.checkLease(ctx, expired, leaseID); err != nil {
  169. return err
  170. }
  171. }
  172. return nil
  173. }
  174. // TODO: handle failures from "grpc.FailFast(false)"
  175. func (lc *leaseExpireChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
  176. return lc.cli.TimeToLive(
  177. ctx,
  178. clientv3.LeaseID(leaseID),
  179. clientv3.WithAttachedKeys(),
  180. )
  181. }
  182. func (lc *leaseExpireChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  183. // keep retrying until lease's state is known or ctx is being canceled
  184. for ctx.Err() == nil {
  185. resp, err := lc.getLeaseByID(ctx, leaseID)
  186. if err != nil {
  187. // for ~v3.1 compatibilities
  188. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  189. return true, nil
  190. }
  191. } else {
  192. return resp.TTL == -1, nil
  193. }
  194. lc.lg.Warn(
  195. "hasLeaseExpired getLeaseByID failed",
  196. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  197. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  198. zap.Error(err),
  199. )
  200. }
  201. return false, ctx.Err()
  202. }
  203. // The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
  204. // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
  205. // determines whether the attached keys for a given leaseID has been deleted or not
  206. func (lc *leaseExpireChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  207. resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
  208. if err != nil {
  209. lc.lg.Warn(
  210. "hasKeysAttachedToLeaseExpired failed",
  211. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  212. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  213. zap.Error(err),
  214. )
  215. return false, err
  216. }
  217. return len(resp.Kvs) == 0, nil
  218. }