checks.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "time"
  19. "github.com/coreos/etcd/clientv3"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. "github.com/coreos/etcd/functional/rpcpb"
  22. "go.uber.org/zap"
  23. "google.golang.org/grpc"
  24. )
  25. const retries = 7
  26. // Checker checks cluster consistency.
  27. type Checker interface {
  28. // Check returns an error if the system fails a consistency check.
  29. Check() error
  30. }
  31. type hashAndRevGetter interface {
  32. getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
  33. }
  34. type hashChecker struct {
  35. lg *zap.Logger
  36. hrg hashAndRevGetter
  37. }
  38. func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker {
  39. return &hashChecker{
  40. lg: lg,
  41. hrg: hrg,
  42. }
  43. }
  44. const leaseCheckerTimeout = 10 * time.Second
  45. func (hc *hashChecker) checkRevAndHashes() (err error) {
  46. var (
  47. revs map[string]int64
  48. hashes map[string]int64
  49. )
  50. // retries in case of transient failure or etcd cluster has not stablized yet.
  51. for i := 0; i < retries; i++ {
  52. revs, hashes, err = hc.hrg.getRevisionHash()
  53. if err != nil {
  54. hc.lg.Warn(
  55. "failed to get revision and hash",
  56. zap.Int("retries", i),
  57. zap.Error(err),
  58. )
  59. } else {
  60. sameRev := getSameValue(revs)
  61. sameHashes := getSameValue(hashes)
  62. if sameRev && sameHashes {
  63. return nil
  64. }
  65. hc.lg.Warn(
  66. "retrying; etcd cluster is not stable",
  67. zap.Int("retries", i),
  68. zap.Bool("same-revisions", sameRev),
  69. zap.Bool("same-hashes", sameHashes),
  70. zap.String("revisions", fmt.Sprintf("%+v", revs)),
  71. zap.String("hashes", fmt.Sprintf("%+v", hashes)),
  72. )
  73. }
  74. time.Sleep(time.Second)
  75. }
  76. if err != nil {
  77. return fmt.Errorf("failed revision and hash check (%v)", err)
  78. }
  79. return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes)
  80. }
  81. func (hc *hashChecker) Check() error {
  82. return hc.checkRevAndHashes()
  83. }
  84. type leaseChecker struct {
  85. lg *zap.Logger
  86. m *rpcpb.Member
  87. ls *leaseStresser
  88. cli *clientv3.Client
  89. }
  90. func (lc *leaseChecker) Check() error {
  91. if lc.ls == nil {
  92. return nil
  93. }
  94. if lc.ls != nil &&
  95. (lc.ls.revokedLeases == nil ||
  96. lc.ls.aliveLeases == nil ||
  97. lc.ls.shortLivedLeases == nil) {
  98. return nil
  99. }
  100. cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
  101. if err != nil {
  102. return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
  103. }
  104. defer func() {
  105. if cli != nil {
  106. cli.Close()
  107. }
  108. }()
  109. lc.cli = cli
  110. if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
  111. return err
  112. }
  113. if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
  114. return err
  115. }
  116. return lc.checkShortLivedLeases()
  117. }
  118. // checkShortLivedLeases ensures leases expire.
  119. func (lc *leaseChecker) checkShortLivedLeases() error {
  120. ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
  121. errc := make(chan error)
  122. defer cancel()
  123. for leaseID := range lc.ls.shortLivedLeases.leases {
  124. go func(id int64) {
  125. errc <- lc.checkShortLivedLease(ctx, id)
  126. }(leaseID)
  127. }
  128. var errs []error
  129. for range lc.ls.shortLivedLeases.leases {
  130. if err := <-errc; err != nil {
  131. errs = append(errs, err)
  132. }
  133. }
  134. return errsToError(errs)
  135. }
  136. func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
  137. // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
  138. var resp *clientv3.LeaseTimeToLiveResponse
  139. for i := 0; i < retries; i++ {
  140. resp, err = lc.getLeaseByID(ctx, leaseID)
  141. // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
  142. if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
  143. return nil
  144. }
  145. if err != nil {
  146. lc.lg.Debug(
  147. "retrying; Lease TimeToLive failed",
  148. zap.Int("retries", i),
  149. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  150. zap.Error(err),
  151. )
  152. continue
  153. }
  154. if resp.TTL > 0 {
  155. dur := time.Duration(resp.TTL) * time.Second
  156. lc.lg.Debug(
  157. "lease has not been expired, wait until expire",
  158. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  159. zap.Int64("ttl", resp.TTL),
  160. zap.Duration("wait-duration", dur),
  161. )
  162. time.Sleep(dur)
  163. } else {
  164. lc.lg.Debug(
  165. "lease expired but not yet revoked",
  166. zap.Int("retries", i),
  167. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  168. zap.Int64("ttl", resp.TTL),
  169. zap.Duration("wait-duration", time.Second),
  170. )
  171. time.Sleep(time.Second)
  172. }
  173. if err = lc.checkLease(ctx, false, leaseID); err != nil {
  174. continue
  175. }
  176. return nil
  177. }
  178. return err
  179. }
  180. func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
  181. keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
  182. if err != nil {
  183. lc.lg.Warn(
  184. "hasKeysAttachedToLeaseExpired failed",
  185. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  186. zap.Error(err),
  187. )
  188. return err
  189. }
  190. leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
  191. if err != nil {
  192. lc.lg.Warn(
  193. "hasLeaseExpired failed",
  194. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  195. zap.Error(err),
  196. )
  197. return err
  198. }
  199. if leaseExpired != keysExpired {
  200. return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
  201. }
  202. if leaseExpired != expired {
  203. return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
  204. }
  205. return nil
  206. }
  207. func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
  208. ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
  209. defer cancel()
  210. for leaseID := range leases {
  211. if err := lc.checkLease(ctx, expired, leaseID); err != nil {
  212. return err
  213. }
  214. }
  215. return nil
  216. }
  217. // TODO: handle failures from "grpc.FailFast(false)"
  218. func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
  219. return lc.cli.TimeToLive(
  220. ctx,
  221. clientv3.LeaseID(leaseID),
  222. clientv3.WithAttachedKeys(),
  223. )
  224. }
  225. func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  226. // keep retrying until lease's state is known or ctx is being canceled
  227. for ctx.Err() == nil {
  228. resp, err := lc.getLeaseByID(ctx, leaseID)
  229. if err != nil {
  230. // for ~v3.1 compatibilities
  231. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  232. return true, nil
  233. }
  234. } else {
  235. return resp.TTL == -1, nil
  236. }
  237. lc.lg.Warn(
  238. "hasLeaseExpired getLeaseByID failed",
  239. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  240. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  241. zap.Error(err),
  242. )
  243. }
  244. return false, ctx.Err()
  245. }
  246. // The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
  247. // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
  248. // determines whether the attached keys for a given leaseID has been deleted or not
  249. func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  250. resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
  251. if err != nil {
  252. lc.lg.Warn(
  253. "hasKeysAttachedToLeaseExpired failed",
  254. zap.String("endpoint", lc.m.EtcdClientEndpoint),
  255. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  256. zap.Error(err),
  257. )
  258. return false, err
  259. }
  260. return len(resp.Kvs) == 0, nil
  261. }
  262. // compositeChecker implements a checker that runs a slice of Checkers concurrently.
  263. type compositeChecker struct{ checkers []Checker }
  264. func newCompositeChecker(checkers []Checker) Checker {
  265. return &compositeChecker{checkers}
  266. }
  267. func (cchecker *compositeChecker) Check() error {
  268. errc := make(chan error)
  269. for _, c := range cchecker.checkers {
  270. go func(chk Checker) { errc <- chk.Check() }(c)
  271. }
  272. var errs []error
  273. for range cchecker.checkers {
  274. if err := <-errc; err != nil {
  275. errs = append(errs, err)
  276. }
  277. }
  278. return errsToError(errs)
  279. }
  280. type runnerChecker struct {
  281. errc chan error
  282. }
  283. func (rc *runnerChecker) Check() error {
  284. select {
  285. case err := <-rc.errc:
  286. return err
  287. default:
  288. return nil
  289. }
  290. }
  291. type noChecker struct{}
  292. func newNoChecker() Checker { return &noChecker{} }
  293. func (nc *noChecker) Check() error { return nil }