stresser_lease.go 13 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "go.etcd.io/etcd/clientv3"
  23. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  24. "go.etcd.io/etcd/functional/rpcpb"
  25. "go.uber.org/zap"
  26. "golang.org/x/time/rate"
  27. "google.golang.org/grpc"
  28. )
  29. const (
  30. // time to live for lease
  31. defaultTTL = 120
  32. defaultTTLShort = 2
  33. )
  34. type leaseStresser struct {
  35. stype rpcpb.StresserType
  36. lg *zap.Logger
  37. m *rpcpb.Member
  38. cli *clientv3.Client
  39. ctx context.Context
  40. cancel func()
  41. rateLimiter *rate.Limiter
  42. // atomicModifiedKey records the number of keys created and deleted during a test case
  43. atomicModifiedKey int64
  44. numLeases int
  45. keysPerLease int
  46. aliveLeases *atomicLeases
  47. revokedLeases *atomicLeases
  48. shortLivedLeases *atomicLeases
  49. runWg sync.WaitGroup
  50. aliveWg sync.WaitGroup
  51. }
  52. type atomicLeases struct {
  53. // rwLock is used to protect read/write access of leases map
  54. // which are accessed and modified by different go routines.
  55. rwLock sync.RWMutex
  56. leases map[int64]time.Time
  57. }
  58. func (al *atomicLeases) add(leaseID int64, t time.Time) {
  59. al.rwLock.Lock()
  60. al.leases[leaseID] = t
  61. al.rwLock.Unlock()
  62. }
  63. func (al *atomicLeases) update(leaseID int64, t time.Time) {
  64. al.rwLock.Lock()
  65. _, ok := al.leases[leaseID]
  66. if ok {
  67. al.leases[leaseID] = t
  68. }
  69. al.rwLock.Unlock()
  70. }
  71. func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
  72. al.rwLock.RLock()
  73. rv, ok = al.leases[leaseID]
  74. al.rwLock.RUnlock()
  75. return rv, ok
  76. }
  77. func (al *atomicLeases) remove(leaseID int64) {
  78. al.rwLock.Lock()
  79. delete(al.leases, leaseID)
  80. al.rwLock.Unlock()
  81. }
  82. func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
  83. leasesCopy := make(map[int64]time.Time)
  84. al.rwLock.RLock()
  85. for k, v := range al.leases {
  86. leasesCopy[k] = v
  87. }
  88. al.rwLock.RUnlock()
  89. return leasesCopy
  90. }
  91. func (ls *leaseStresser) setupOnce() error {
  92. if ls.aliveLeases != nil {
  93. return nil
  94. }
  95. if ls.numLeases == 0 {
  96. panic("expect numLeases to be set")
  97. }
  98. if ls.keysPerLease == 0 {
  99. panic("expect keysPerLease to be set")
  100. }
  101. ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  102. return nil
  103. }
  104. func (ls *leaseStresser) Stress() error {
  105. ls.lg.Info(
  106. "stress START",
  107. zap.String("stress-type", ls.stype.String()),
  108. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  109. )
  110. if err := ls.setupOnce(); err != nil {
  111. return err
  112. }
  113. ctx, cancel := context.WithCancel(context.Background())
  114. ls.ctx = ctx
  115. ls.cancel = cancel
  116. cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
  117. if err != nil {
  118. return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
  119. }
  120. ls.cli = cli
  121. ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  122. ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  123. ls.runWg.Add(1)
  124. go ls.run()
  125. return nil
  126. }
  127. func (ls *leaseStresser) run() {
  128. defer ls.runWg.Done()
  129. ls.restartKeepAlives()
  130. for {
  131. // the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
  132. // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
  133. err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
  134. if err == context.Canceled {
  135. return
  136. }
  137. ls.lg.Debug(
  138. "stress creating leases",
  139. zap.String("stress-type", ls.stype.String()),
  140. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  141. )
  142. ls.createLeases()
  143. ls.lg.Debug(
  144. "stress created leases",
  145. zap.String("stress-type", ls.stype.String()),
  146. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  147. )
  148. ls.lg.Debug(
  149. "stress dropped leases",
  150. zap.String("stress-type", ls.stype.String()),
  151. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  152. )
  153. ls.randomlyDropLeases()
  154. ls.lg.Debug(
  155. "stress dropped leases",
  156. zap.String("stress-type", ls.stype.String()),
  157. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  158. )
  159. }
  160. }
  161. func (ls *leaseStresser) restartKeepAlives() {
  162. for leaseID := range ls.aliveLeases.getLeasesMap() {
  163. ls.aliveWg.Add(1)
  164. go func(id int64) {
  165. ls.keepLeaseAlive(id)
  166. }(leaseID)
  167. }
  168. }
  169. func (ls *leaseStresser) createLeases() {
  170. ls.createAliveLeases()
  171. ls.createShortLivedLeases()
  172. }
  173. func (ls *leaseStresser) createAliveLeases() {
  174. neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
  175. var wg sync.WaitGroup
  176. for i := 0; i < neededLeases; i++ {
  177. wg.Add(1)
  178. go func() {
  179. defer wg.Done()
  180. leaseID, err := ls.createLeaseWithKeys(defaultTTL)
  181. if err != nil {
  182. ls.lg.Debug(
  183. "createLeaseWithKeys failed",
  184. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  185. zap.Error(err),
  186. )
  187. return
  188. }
  189. ls.aliveLeases.add(leaseID, time.Now())
  190. // keep track of all the keep lease alive go routines
  191. ls.aliveWg.Add(1)
  192. go ls.keepLeaseAlive(leaseID)
  193. }()
  194. }
  195. wg.Wait()
  196. }
  197. func (ls *leaseStresser) createShortLivedLeases() {
  198. // one round of createLeases() might not create all the short lived leases we want due to falures.
  199. // thus, we want to create remaining short lived leases in the future round.
  200. neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
  201. var wg sync.WaitGroup
  202. for i := 0; i < neededLeases; i++ {
  203. wg.Add(1)
  204. go func() {
  205. defer wg.Done()
  206. leaseID, err := ls.createLeaseWithKeys(defaultTTLShort)
  207. if err != nil {
  208. return
  209. }
  210. ls.shortLivedLeases.add(leaseID, time.Now())
  211. }()
  212. }
  213. wg.Wait()
  214. }
  215. func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
  216. leaseID, err := ls.createLease(ttl)
  217. if err != nil {
  218. ls.lg.Debug(
  219. "createLease failed",
  220. zap.String("stress-type", ls.stype.String()),
  221. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  222. zap.Error(err),
  223. )
  224. return -1, err
  225. }
  226. ls.lg.Debug(
  227. "createLease created lease",
  228. zap.String("stress-type", ls.stype.String()),
  229. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  230. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  231. )
  232. if err := ls.attachKeysWithLease(leaseID); err != nil {
  233. return -1, err
  234. }
  235. return leaseID, nil
  236. }
  237. func (ls *leaseStresser) randomlyDropLeases() {
  238. var wg sync.WaitGroup
  239. for l := range ls.aliveLeases.getLeasesMap() {
  240. wg.Add(1)
  241. go func(leaseID int64) {
  242. defer wg.Done()
  243. dropped, err := ls.randomlyDropLease(leaseID)
  244. // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
  245. // because we can't tell whether the lease is dropped or not.
  246. if err != nil {
  247. ls.lg.Debug(
  248. "randomlyDropLease failed",
  249. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  250. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  251. zap.Error(err),
  252. )
  253. ls.aliveLeases.remove(leaseID)
  254. return
  255. }
  256. if !dropped {
  257. return
  258. }
  259. ls.lg.Debug(
  260. "randomlyDropLease dropped a lease",
  261. zap.String("stress-type", ls.stype.String()),
  262. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  263. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  264. )
  265. ls.revokedLeases.add(leaseID, time.Now())
  266. ls.aliveLeases.remove(leaseID)
  267. }(l)
  268. }
  269. wg.Wait()
  270. }
  271. func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
  272. resp, err := ls.cli.Grant(ls.ctx, ttl)
  273. if err != nil {
  274. return -1, err
  275. }
  276. return int64(resp.ID), nil
  277. }
  278. func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
  279. defer ls.aliveWg.Done()
  280. ctx, cancel := context.WithCancel(ls.ctx)
  281. stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
  282. defer func() { cancel() }()
  283. for {
  284. select {
  285. case <-time.After(500 * time.Millisecond):
  286. case <-ls.ctx.Done():
  287. ls.lg.Debug(
  288. "keepLeaseAlive context canceled",
  289. zap.String("stress-type", ls.stype.String()),
  290. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  291. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  292. zap.Error(ls.ctx.Err()),
  293. )
  294. // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
  295. // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
  296. // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
  297. // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
  298. // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
  299. renewTime, ok := ls.aliveLeases.read(leaseID)
  300. if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) {
  301. ls.aliveLeases.remove(leaseID)
  302. ls.lg.Debug(
  303. "keepLeaseAlive lease has not been renewed, dropped it",
  304. zap.String("stress-type", ls.stype.String()),
  305. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  306. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  307. )
  308. }
  309. return
  310. }
  311. if err != nil {
  312. ls.lg.Debug(
  313. "keepLeaseAlive lease creates stream error",
  314. zap.String("stress-type", ls.stype.String()),
  315. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  316. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  317. zap.Error(err),
  318. )
  319. cancel()
  320. ctx, cancel = context.WithCancel(ls.ctx)
  321. stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
  322. cancel()
  323. continue
  324. }
  325. if err != nil {
  326. ls.lg.Debug(
  327. "keepLeaseAlive failed to receive lease keepalive response",
  328. zap.String("stress-type", ls.stype.String()),
  329. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  330. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  331. zap.Error(err),
  332. )
  333. continue
  334. }
  335. ls.lg.Debug(
  336. "keepLeaseAlive waiting on lease stream",
  337. zap.String("stress-type", ls.stype.String()),
  338. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  339. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  340. )
  341. leaseRenewTime := time.Now()
  342. respRC := <-stream
  343. if respRC == nil {
  344. ls.lg.Debug(
  345. "keepLeaseAlive received nil lease keepalive response",
  346. zap.String("stress-type", ls.stype.String()),
  347. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  348. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  349. )
  350. continue
  351. }
  352. // lease expires after TTL become 0
  353. // don't send keepalive if the lease has expired
  354. if respRC.TTL <= 0 {
  355. ls.lg.Debug(
  356. "keepLeaseAlive stream received lease keepalive response TTL <= 0",
  357. zap.String("stress-type", ls.stype.String()),
  358. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  359. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  360. zap.Int64("ttl", respRC.TTL),
  361. )
  362. ls.aliveLeases.remove(leaseID)
  363. return
  364. }
  365. // renew lease timestamp only if lease is present
  366. ls.lg.Debug(
  367. "keepLeaseAlive renewed a lease",
  368. zap.String("stress-type", ls.stype.String()),
  369. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  370. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  371. )
  372. ls.aliveLeases.update(leaseID, leaseRenewTime)
  373. }
  374. }
  375. // attachKeysWithLease function attaches keys to the lease.
  376. // the format of key is the concat of leaseID + '_' + '<order of key creation>'
  377. // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
  378. func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
  379. var txnPuts []clientv3.Op
  380. for j := 0; j < ls.keysPerLease; j++ {
  381. txnput := clientv3.OpPut(
  382. fmt.Sprintf("%d%s%d", leaseID, "_", j),
  383. fmt.Sprintf("bar"),
  384. clientv3.WithLease(clientv3.LeaseID(leaseID)),
  385. )
  386. txnPuts = append(txnPuts, txnput)
  387. }
  388. // keep retrying until lease is not found or ctx is being canceled
  389. for ls.ctx.Err() == nil {
  390. _, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit()
  391. if err == nil {
  392. // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
  393. atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
  394. return nil
  395. }
  396. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  397. return err
  398. }
  399. }
  400. return ls.ctx.Err()
  401. }
  402. // randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
  403. // This creates a 50/50 percents chance of dropping a lease
  404. func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
  405. if rand.Intn(2) != 0 {
  406. return false, nil
  407. }
  408. // keep retrying until a lease is dropped or ctx is being canceled
  409. for ls.ctx.Err() == nil {
  410. _, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID))
  411. if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  412. return true, nil
  413. }
  414. }
  415. ls.lg.Debug(
  416. "randomlyDropLease error",
  417. zap.String("stress-type", ls.stype.String()),
  418. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  419. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  420. zap.Error(ls.ctx.Err()),
  421. )
  422. return false, ls.ctx.Err()
  423. }
  424. func (ls *leaseStresser) Pause() map[string]int {
  425. return ls.Close()
  426. }
  427. func (ls *leaseStresser) Close() map[string]int {
  428. ls.cancel()
  429. ls.runWg.Wait()
  430. ls.aliveWg.Wait()
  431. ls.cli.Close()
  432. ls.lg.Info(
  433. "stress STOP",
  434. zap.String("stress-type", ls.stype.String()),
  435. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  436. )
  437. return nil
  438. }
  439. func (ls *leaseStresser) ModifiedKeys() int64 {
  440. return atomic.LoadInt64(&ls.atomicModifiedKey)
  441. }