stress_lease.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "go.uber.org/zap"
  25. "golang.org/x/time/rate"
  26. "google.golang.org/grpc"
  27. )
  28. const (
  29. // time to live for lease
  30. TTL = 120
  31. TTLShort = 2
  32. )
  33. type leaseStresser struct {
  34. logger *zap.Logger
  35. endpoint string
  36. cancel func()
  37. conn *grpc.ClientConn
  38. kvc pb.KVClient
  39. lc pb.LeaseClient
  40. ctx context.Context
  41. rateLimiter *rate.Limiter
  42. // atomicModifiedKey records the number of keys created and deleted during a test case
  43. atomicModifiedKey int64
  44. numLeases int
  45. keysPerLease int
  46. aliveLeases *atomicLeases
  47. revokedLeases *atomicLeases
  48. shortLivedLeases *atomicLeases
  49. runWg sync.WaitGroup
  50. aliveWg sync.WaitGroup
  51. }
  52. type atomicLeases struct {
  53. // rwLock is used to protect read/write access of leases map
  54. // which are accessed and modified by different go routines.
  55. rwLock sync.RWMutex
  56. leases map[int64]time.Time
  57. }
  58. func (al *atomicLeases) add(leaseID int64, t time.Time) {
  59. al.rwLock.Lock()
  60. al.leases[leaseID] = t
  61. al.rwLock.Unlock()
  62. }
  63. func (al *atomicLeases) update(leaseID int64, t time.Time) {
  64. al.rwLock.Lock()
  65. _, ok := al.leases[leaseID]
  66. if ok {
  67. al.leases[leaseID] = t
  68. }
  69. al.rwLock.Unlock()
  70. }
  71. func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
  72. al.rwLock.RLock()
  73. rv, ok = al.leases[leaseID]
  74. al.rwLock.RUnlock()
  75. return rv, ok
  76. }
  77. func (al *atomicLeases) remove(leaseID int64) {
  78. al.rwLock.Lock()
  79. delete(al.leases, leaseID)
  80. al.rwLock.Unlock()
  81. }
  82. func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
  83. leasesCopy := make(map[int64]time.Time)
  84. al.rwLock.RLock()
  85. for k, v := range al.leases {
  86. leasesCopy[k] = v
  87. }
  88. al.rwLock.RUnlock()
  89. return leasesCopy
  90. }
  91. func (ls *leaseStresser) setupOnce() error {
  92. if ls.aliveLeases != nil {
  93. return nil
  94. }
  95. if ls.numLeases == 0 {
  96. panic("expect numLeases to be set")
  97. }
  98. if ls.keysPerLease == 0 {
  99. panic("expect keysPerLease to be set")
  100. }
  101. ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  102. return nil
  103. }
  104. func (ls *leaseStresser) Stress() error {
  105. ls.logger.Info(
  106. "lease stresser is started",
  107. zap.String("endpoint", ls.endpoint),
  108. )
  109. if err := ls.setupOnce(); err != nil {
  110. return err
  111. }
  112. conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
  113. if err != nil {
  114. return fmt.Errorf("%v (%s)", err, ls.endpoint)
  115. }
  116. ls.conn = conn
  117. ls.kvc = pb.NewKVClient(conn)
  118. ls.lc = pb.NewLeaseClient(conn)
  119. ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  120. ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  121. ctx, cancel := context.WithCancel(context.Background())
  122. ls.cancel = cancel
  123. ls.ctx = ctx
  124. ls.runWg.Add(1)
  125. go ls.run()
  126. return nil
  127. }
  128. func (ls *leaseStresser) run() {
  129. defer ls.runWg.Done()
  130. ls.restartKeepAlives()
  131. for {
  132. // the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
  133. // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
  134. err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
  135. if err == context.Canceled {
  136. return
  137. }
  138. ls.logger.Debug(
  139. "lease stresser is creating leases",
  140. zap.String("endpoint", ls.endpoint),
  141. )
  142. ls.createLeases()
  143. ls.logger.Debug(
  144. "lease stresser created leases",
  145. zap.String("endpoint", ls.endpoint),
  146. )
  147. ls.logger.Debug(
  148. "lease stresser is dropped leases",
  149. zap.String("endpoint", ls.endpoint),
  150. )
  151. ls.randomlyDropLeases()
  152. ls.logger.Debug(
  153. "lease stresser dropped leases",
  154. zap.String("endpoint", ls.endpoint),
  155. )
  156. }
  157. }
  158. func (ls *leaseStresser) restartKeepAlives() {
  159. for leaseID := range ls.aliveLeases.getLeasesMap() {
  160. ls.aliveWg.Add(1)
  161. go func(id int64) {
  162. ls.keepLeaseAlive(id)
  163. }(leaseID)
  164. }
  165. }
  166. func (ls *leaseStresser) createLeases() {
  167. ls.createAliveLeases()
  168. ls.createShortLivedLeases()
  169. }
  170. func (ls *leaseStresser) createAliveLeases() {
  171. neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
  172. var wg sync.WaitGroup
  173. for i := 0; i < neededLeases; i++ {
  174. wg.Add(1)
  175. go func() {
  176. defer wg.Done()
  177. leaseID, err := ls.createLeaseWithKeys(TTL)
  178. if err != nil {
  179. ls.logger.Debug(
  180. "createLeaseWithKeys failed",
  181. zap.String("endpoint", ls.endpoint),
  182. zap.Error(err),
  183. )
  184. return
  185. }
  186. ls.aliveLeases.add(leaseID, time.Now())
  187. // keep track of all the keep lease alive go routines
  188. ls.aliveWg.Add(1)
  189. go ls.keepLeaseAlive(leaseID)
  190. }()
  191. }
  192. wg.Wait()
  193. }
  194. func (ls *leaseStresser) createShortLivedLeases() {
  195. // one round of createLeases() might not create all the short lived leases we want due to falures.
  196. // thus, we want to create remaining short lived leases in the future round.
  197. neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
  198. var wg sync.WaitGroup
  199. for i := 0; i < neededLeases; i++ {
  200. wg.Add(1)
  201. go func() {
  202. defer wg.Done()
  203. leaseID, err := ls.createLeaseWithKeys(TTLShort)
  204. if err != nil {
  205. return
  206. }
  207. ls.shortLivedLeases.add(leaseID, time.Now())
  208. }()
  209. }
  210. wg.Wait()
  211. }
  212. func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
  213. leaseID, err := ls.createLease(ttl)
  214. if err != nil {
  215. ls.logger.Debug(
  216. "createLease failed",
  217. zap.String("endpoint", ls.endpoint),
  218. zap.Error(err),
  219. )
  220. return -1, err
  221. }
  222. ls.logger.Debug(
  223. "createLease created lease",
  224. zap.String("endpoint", ls.endpoint),
  225. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  226. )
  227. if err := ls.attachKeysWithLease(leaseID); err != nil {
  228. return -1, err
  229. }
  230. return leaseID, nil
  231. }
  232. func (ls *leaseStresser) randomlyDropLeases() {
  233. var wg sync.WaitGroup
  234. for l := range ls.aliveLeases.getLeasesMap() {
  235. wg.Add(1)
  236. go func(leaseID int64) {
  237. defer wg.Done()
  238. dropped, err := ls.randomlyDropLease(leaseID)
  239. // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
  240. // because we can't tell whether the lease is dropped or not.
  241. if err != nil {
  242. ls.logger.Debug(
  243. "randomlyDropLease failed",
  244. zap.String("endpoint", ls.endpoint),
  245. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  246. zap.Error(err),
  247. )
  248. ls.aliveLeases.remove(leaseID)
  249. return
  250. }
  251. if !dropped {
  252. return
  253. }
  254. ls.logger.Debug(
  255. "randomlyDropLease dropped a lease",
  256. zap.String("endpoint", ls.endpoint),
  257. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  258. )
  259. ls.revokedLeases.add(leaseID, time.Now())
  260. ls.aliveLeases.remove(leaseID)
  261. }(l)
  262. }
  263. wg.Wait()
  264. }
  265. func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
  266. resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
  267. if err != nil {
  268. return -1, err
  269. }
  270. return resp.ID, nil
  271. }
  272. func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
  273. defer ls.aliveWg.Done()
  274. ctx, cancel := context.WithCancel(ls.ctx)
  275. stream, err := ls.lc.LeaseKeepAlive(ctx)
  276. defer func() { cancel() }()
  277. for {
  278. select {
  279. case <-time.After(500 * time.Millisecond):
  280. case <-ls.ctx.Done():
  281. ls.logger.Debug(
  282. "keepLeaseAlive context canceled",
  283. zap.String("endpoint", ls.endpoint),
  284. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  285. zap.Error(ls.ctx.Err()),
  286. )
  287. // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
  288. // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
  289. // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
  290. // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
  291. // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
  292. renewTime, ok := ls.aliveLeases.read(leaseID)
  293. if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
  294. ls.aliveLeases.remove(leaseID)
  295. ls.logger.Debug(
  296. "keepLeaseAlive lease has not been renewed, dropped it",
  297. zap.String("endpoint", ls.endpoint),
  298. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  299. )
  300. }
  301. return
  302. }
  303. if err != nil {
  304. ls.logger.Debug(
  305. "keepLeaseAlive lease creates stream error",
  306. zap.String("endpoint", ls.endpoint),
  307. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  308. zap.Error(err),
  309. )
  310. cancel()
  311. ctx, cancel = context.WithCancel(ls.ctx)
  312. stream, err = ls.lc.LeaseKeepAlive(ctx)
  313. cancel()
  314. continue
  315. }
  316. ls.logger.Debug(
  317. "keepLeaseAlive stream sends lease keepalive request",
  318. zap.String("endpoint", ls.endpoint),
  319. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  320. )
  321. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
  322. if err != nil {
  323. ls.logger.Debug(
  324. "keepLeaseAlive stream failed to send lease keepalive request",
  325. zap.String("endpoint", ls.endpoint),
  326. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  327. zap.Error(err),
  328. )
  329. continue
  330. }
  331. leaseRenewTime := time.Now()
  332. ls.logger.Debug(
  333. "keepLeaseAlive stream sent lease keepalive request",
  334. zap.String("endpoint", ls.endpoint),
  335. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  336. )
  337. respRC, err := stream.Recv()
  338. if err != nil {
  339. ls.logger.Debug(
  340. "keepLeaseAlive stream failed to receive lease keepalive response",
  341. zap.String("endpoint", ls.endpoint),
  342. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  343. zap.Error(err),
  344. )
  345. continue
  346. }
  347. // lease expires after TTL become 0
  348. // don't send keepalive if the lease has expired
  349. if respRC.TTL <= 0 {
  350. ls.logger.Debug(
  351. "keepLeaseAlive stream received lease keepalive response TTL <= 0",
  352. zap.String("endpoint", ls.endpoint),
  353. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  354. zap.Int64("ttl", respRC.TTL),
  355. )
  356. ls.aliveLeases.remove(leaseID)
  357. return
  358. }
  359. // renew lease timestamp only if lease is present
  360. ls.logger.Debug(
  361. "keepLeaseAlive renewed a lease",
  362. zap.String("endpoint", ls.endpoint),
  363. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  364. )
  365. ls.aliveLeases.update(leaseID, leaseRenewTime)
  366. }
  367. }
  368. // attachKeysWithLease function attaches keys to the lease.
  369. // the format of key is the concat of leaseID + '_' + '<order of key creation>'
  370. // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
  371. func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
  372. var txnPuts []*pb.RequestOp
  373. for j := 0; j < ls.keysPerLease; j++ {
  374. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
  375. Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
  376. txnPuts = append(txnPuts, txnput)
  377. }
  378. // keep retrying until lease is not found or ctx is being canceled
  379. for ls.ctx.Err() == nil {
  380. txn := &pb.TxnRequest{Success: txnPuts}
  381. _, err := ls.kvc.Txn(ls.ctx, txn)
  382. if err == nil {
  383. // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
  384. atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
  385. return nil
  386. }
  387. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  388. return err
  389. }
  390. }
  391. return ls.ctx.Err()
  392. }
  393. // randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
  394. // This creates a 50/50 percents chance of dropping a lease
  395. func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
  396. if rand.Intn(2) != 0 {
  397. return false, nil
  398. }
  399. // keep retrying until a lease is dropped or ctx is being canceled
  400. for ls.ctx.Err() == nil {
  401. _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
  402. if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  403. return true, nil
  404. }
  405. }
  406. ls.logger.Debug(
  407. "randomlyDropLease error",
  408. zap.String("endpoint", ls.endpoint),
  409. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  410. zap.Error(ls.ctx.Err()),
  411. )
  412. return false, ls.ctx.Err()
  413. }
  414. func (ls *leaseStresser) Pause() {
  415. ls.Close()
  416. }
  417. func (ls *leaseStresser) Close() {
  418. ls.logger.Info(
  419. "lease stresser is closing",
  420. zap.String("endpoint", ls.endpoint),
  421. )
  422. ls.cancel()
  423. ls.runWg.Wait()
  424. ls.aliveWg.Wait()
  425. ls.conn.Close()
  426. ls.logger.Info(
  427. "lease stresser is closed",
  428. zap.String("endpoint", ls.endpoint),
  429. )
  430. }
  431. func (ls *leaseStresser) ModifiedKeys() int64 {
  432. return atomic.LoadInt64(&ls.atomicModifiedKey)
  433. }
  434. func (ls *leaseStresser) Checker() Checker {
  435. return &leaseChecker{
  436. logger: ls.logger,
  437. endpoint: ls.endpoint,
  438. ls: ls,
  439. }
  440. }