stress_lease.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  25. "go.uber.org/zap"
  26. "golang.org/x/time/rate"
  27. "google.golang.org/grpc"
  28. )
  29. const (
  30. // time to live for lease
  31. TTL = 120
  32. TTLShort = 2
  33. )
  34. type leaseStresser struct {
  35. lg *zap.Logger
  36. m *rpcpb.Member
  37. cancel func()
  38. conn *grpc.ClientConn
  39. kvc pb.KVClient
  40. lc pb.LeaseClient
  41. ctx context.Context
  42. rateLimiter *rate.Limiter
  43. // atomicModifiedKey records the number of keys created and deleted during a test case
  44. atomicModifiedKey int64
  45. numLeases int
  46. keysPerLease int
  47. aliveLeases *atomicLeases
  48. revokedLeases *atomicLeases
  49. shortLivedLeases *atomicLeases
  50. runWg sync.WaitGroup
  51. aliveWg sync.WaitGroup
  52. }
  53. type atomicLeases struct {
  54. // rwLock is used to protect read/write access of leases map
  55. // which are accessed and modified by different go routines.
  56. rwLock sync.RWMutex
  57. leases map[int64]time.Time
  58. }
  59. func (al *atomicLeases) add(leaseID int64, t time.Time) {
  60. al.rwLock.Lock()
  61. al.leases[leaseID] = t
  62. al.rwLock.Unlock()
  63. }
  64. func (al *atomicLeases) update(leaseID int64, t time.Time) {
  65. al.rwLock.Lock()
  66. _, ok := al.leases[leaseID]
  67. if ok {
  68. al.leases[leaseID] = t
  69. }
  70. al.rwLock.Unlock()
  71. }
  72. func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
  73. al.rwLock.RLock()
  74. rv, ok = al.leases[leaseID]
  75. al.rwLock.RUnlock()
  76. return rv, ok
  77. }
  78. func (al *atomicLeases) remove(leaseID int64) {
  79. al.rwLock.Lock()
  80. delete(al.leases, leaseID)
  81. al.rwLock.Unlock()
  82. }
  83. func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
  84. leasesCopy := make(map[int64]time.Time)
  85. al.rwLock.RLock()
  86. for k, v := range al.leases {
  87. leasesCopy[k] = v
  88. }
  89. al.rwLock.RUnlock()
  90. return leasesCopy
  91. }
  92. func (ls *leaseStresser) setupOnce() error {
  93. if ls.aliveLeases != nil {
  94. return nil
  95. }
  96. if ls.numLeases == 0 {
  97. panic("expect numLeases to be set")
  98. }
  99. if ls.keysPerLease == 0 {
  100. panic("expect keysPerLease to be set")
  101. }
  102. ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  103. return nil
  104. }
  105. func (ls *leaseStresser) Stress() error {
  106. ls.lg.Info(
  107. "lease stresser is started",
  108. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  109. )
  110. if err := ls.setupOnce(); err != nil {
  111. return err
  112. }
  113. conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second))
  114. if err != nil {
  115. return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
  116. }
  117. ls.conn = conn
  118. ls.kvc = pb.NewKVClient(conn)
  119. ls.lc = pb.NewLeaseClient(conn)
  120. ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  121. ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  122. ctx, cancel := context.WithCancel(context.Background())
  123. ls.cancel = cancel
  124. ls.ctx = ctx
  125. ls.runWg.Add(1)
  126. go ls.run()
  127. return nil
  128. }
  129. func (ls *leaseStresser) run() {
  130. defer ls.runWg.Done()
  131. ls.restartKeepAlives()
  132. for {
  133. // the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
  134. // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
  135. err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
  136. if err == context.Canceled {
  137. return
  138. }
  139. ls.lg.Debug(
  140. "lease stresser is creating leases",
  141. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  142. )
  143. ls.createLeases()
  144. ls.lg.Debug(
  145. "lease stresser created leases",
  146. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  147. )
  148. ls.lg.Debug(
  149. "lease stresser is dropped leases",
  150. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  151. )
  152. ls.randomlyDropLeases()
  153. ls.lg.Debug(
  154. "lease stresser dropped leases",
  155. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  156. )
  157. }
  158. }
  159. func (ls *leaseStresser) restartKeepAlives() {
  160. for leaseID := range ls.aliveLeases.getLeasesMap() {
  161. ls.aliveWg.Add(1)
  162. go func(id int64) {
  163. ls.keepLeaseAlive(id)
  164. }(leaseID)
  165. }
  166. }
  167. func (ls *leaseStresser) createLeases() {
  168. ls.createAliveLeases()
  169. ls.createShortLivedLeases()
  170. }
  171. func (ls *leaseStresser) createAliveLeases() {
  172. neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
  173. var wg sync.WaitGroup
  174. for i := 0; i < neededLeases; i++ {
  175. wg.Add(1)
  176. go func() {
  177. defer wg.Done()
  178. leaseID, err := ls.createLeaseWithKeys(TTL)
  179. if err != nil {
  180. ls.lg.Debug(
  181. "createLeaseWithKeys failed",
  182. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  183. zap.Error(err),
  184. )
  185. return
  186. }
  187. ls.aliveLeases.add(leaseID, time.Now())
  188. // keep track of all the keep lease alive go routines
  189. ls.aliveWg.Add(1)
  190. go ls.keepLeaseAlive(leaseID)
  191. }()
  192. }
  193. wg.Wait()
  194. }
  195. func (ls *leaseStresser) createShortLivedLeases() {
  196. // one round of createLeases() might not create all the short lived leases we want due to falures.
  197. // thus, we want to create remaining short lived leases in the future round.
  198. neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
  199. var wg sync.WaitGroup
  200. for i := 0; i < neededLeases; i++ {
  201. wg.Add(1)
  202. go func() {
  203. defer wg.Done()
  204. leaseID, err := ls.createLeaseWithKeys(TTLShort)
  205. if err != nil {
  206. return
  207. }
  208. ls.shortLivedLeases.add(leaseID, time.Now())
  209. }()
  210. }
  211. wg.Wait()
  212. }
  213. func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
  214. leaseID, err := ls.createLease(ttl)
  215. if err != nil {
  216. ls.lg.Debug(
  217. "createLease failed",
  218. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  219. zap.Error(err),
  220. )
  221. return -1, err
  222. }
  223. ls.lg.Debug(
  224. "createLease created lease",
  225. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  226. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  227. )
  228. if err := ls.attachKeysWithLease(leaseID); err != nil {
  229. return -1, err
  230. }
  231. return leaseID, nil
  232. }
  233. func (ls *leaseStresser) randomlyDropLeases() {
  234. var wg sync.WaitGroup
  235. for l := range ls.aliveLeases.getLeasesMap() {
  236. wg.Add(1)
  237. go func(leaseID int64) {
  238. defer wg.Done()
  239. dropped, err := ls.randomlyDropLease(leaseID)
  240. // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
  241. // because we can't tell whether the lease is dropped or not.
  242. if err != nil {
  243. ls.lg.Debug(
  244. "randomlyDropLease failed",
  245. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  246. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  247. zap.Error(err),
  248. )
  249. ls.aliveLeases.remove(leaseID)
  250. return
  251. }
  252. if !dropped {
  253. return
  254. }
  255. ls.lg.Debug(
  256. "randomlyDropLease dropped a lease",
  257. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  258. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  259. )
  260. ls.revokedLeases.add(leaseID, time.Now())
  261. ls.aliveLeases.remove(leaseID)
  262. }(l)
  263. }
  264. wg.Wait()
  265. }
  266. func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
  267. resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
  268. if err != nil {
  269. return -1, err
  270. }
  271. return resp.ID, nil
  272. }
  273. func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
  274. defer ls.aliveWg.Done()
  275. ctx, cancel := context.WithCancel(ls.ctx)
  276. stream, err := ls.lc.LeaseKeepAlive(ctx)
  277. defer func() { cancel() }()
  278. for {
  279. select {
  280. case <-time.After(500 * time.Millisecond):
  281. case <-ls.ctx.Done():
  282. ls.lg.Debug(
  283. "keepLeaseAlive context canceled",
  284. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  285. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  286. zap.Error(ls.ctx.Err()),
  287. )
  288. // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
  289. // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
  290. // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
  291. // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
  292. // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
  293. renewTime, ok := ls.aliveLeases.read(leaseID)
  294. if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
  295. ls.aliveLeases.remove(leaseID)
  296. ls.lg.Debug(
  297. "keepLeaseAlive lease has not been renewed, dropped it",
  298. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  299. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  300. )
  301. }
  302. return
  303. }
  304. if err != nil {
  305. ls.lg.Debug(
  306. "keepLeaseAlive lease creates stream error",
  307. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  308. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  309. zap.Error(err),
  310. )
  311. cancel()
  312. ctx, cancel = context.WithCancel(ls.ctx)
  313. stream, err = ls.lc.LeaseKeepAlive(ctx)
  314. cancel()
  315. continue
  316. }
  317. ls.lg.Debug(
  318. "keepLeaseAlive stream sends lease keepalive request",
  319. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  320. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  321. )
  322. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
  323. if err != nil {
  324. ls.lg.Debug(
  325. "keepLeaseAlive stream failed to send lease keepalive request",
  326. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  327. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  328. zap.Error(err),
  329. )
  330. continue
  331. }
  332. leaseRenewTime := time.Now()
  333. ls.lg.Debug(
  334. "keepLeaseAlive stream sent lease keepalive request",
  335. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  336. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  337. )
  338. respRC, err := stream.Recv()
  339. if err != nil {
  340. ls.lg.Debug(
  341. "keepLeaseAlive stream failed to receive lease keepalive response",
  342. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  343. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  344. zap.Error(err),
  345. )
  346. continue
  347. }
  348. // lease expires after TTL become 0
  349. // don't send keepalive if the lease has expired
  350. if respRC.TTL <= 0 {
  351. ls.lg.Debug(
  352. "keepLeaseAlive stream received lease keepalive response TTL <= 0",
  353. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  354. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  355. zap.Int64("ttl", respRC.TTL),
  356. )
  357. ls.aliveLeases.remove(leaseID)
  358. return
  359. }
  360. // renew lease timestamp only if lease is present
  361. ls.lg.Debug(
  362. "keepLeaseAlive renewed a lease",
  363. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  364. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  365. )
  366. ls.aliveLeases.update(leaseID, leaseRenewTime)
  367. }
  368. }
  369. // attachKeysWithLease function attaches keys to the lease.
  370. // the format of key is the concat of leaseID + '_' + '<order of key creation>'
  371. // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
  372. func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
  373. var txnPuts []*pb.RequestOp
  374. for j := 0; j < ls.keysPerLease; j++ {
  375. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
  376. Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
  377. txnPuts = append(txnPuts, txnput)
  378. }
  379. // keep retrying until lease is not found or ctx is being canceled
  380. for ls.ctx.Err() == nil {
  381. txn := &pb.TxnRequest{Success: txnPuts}
  382. _, err := ls.kvc.Txn(ls.ctx, txn)
  383. if err == nil {
  384. // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
  385. atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
  386. return nil
  387. }
  388. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  389. return err
  390. }
  391. }
  392. return ls.ctx.Err()
  393. }
  394. // randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
  395. // This creates a 50/50 percents chance of dropping a lease
  396. func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
  397. if rand.Intn(2) != 0 {
  398. return false, nil
  399. }
  400. // keep retrying until a lease is dropped or ctx is being canceled
  401. for ls.ctx.Err() == nil {
  402. _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
  403. if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  404. return true, nil
  405. }
  406. }
  407. ls.lg.Debug(
  408. "randomlyDropLease error",
  409. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  410. zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
  411. zap.Error(ls.ctx.Err()),
  412. )
  413. return false, ls.ctx.Err()
  414. }
  415. func (ls *leaseStresser) Pause() { ls.Close() }
  416. func (ls *leaseStresser) Close() {
  417. ls.lg.Info(
  418. "lease stresser is closing",
  419. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  420. )
  421. ls.cancel()
  422. ls.runWg.Wait()
  423. ls.aliveWg.Wait()
  424. ls.conn.Close()
  425. ls.lg.Info(
  426. "lease stresser is closed",
  427. zap.String("endpoint", ls.m.EtcdClientEndpoint),
  428. )
  429. }
  430. func (ls *leaseStresser) ModifiedKeys() int64 {
  431. return atomic.LoadInt64(&ls.atomicModifiedKey)
  432. }
  433. func (ls *leaseStresser) Checker() Checker {
  434. return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls}
  435. }