lease_stresser.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "golang.org/x/net/context"
  24. "golang.org/x/time/rate"
  25. "google.golang.org/grpc"
  26. )
  27. const (
  28. // time to live for lease
  29. TTL = 120
  30. TTLShort = 2
  31. )
  32. type leaseStresser struct {
  33. endpoint string
  34. cancel func()
  35. conn *grpc.ClientConn
  36. kvc pb.KVClient
  37. lc pb.LeaseClient
  38. ctx context.Context
  39. rateLimiter *rate.Limiter
  40. // atomicModifiedKey records the number of keys created and deleted during a test case
  41. atomicModifiedKey int64
  42. numLeases int
  43. keysPerLease int
  44. aliveLeases *atomicLeases
  45. revokedLeases *atomicLeases
  46. shortLivedLeases *atomicLeases
  47. runWg sync.WaitGroup
  48. aliveWg sync.WaitGroup
  49. }
  50. type atomicLeases struct {
  51. // rwLock is used to protect read/write access of leases map
  52. // which are accessed and modified by different go routines.
  53. rwLock sync.RWMutex
  54. leases map[int64]time.Time
  55. }
  56. func (al *atomicLeases) add(leaseID int64, t time.Time) {
  57. al.rwLock.Lock()
  58. al.leases[leaseID] = t
  59. al.rwLock.Unlock()
  60. }
  61. func (al *atomicLeases) update(leaseID int64, t time.Time) {
  62. al.rwLock.Lock()
  63. _, ok := al.leases[leaseID]
  64. if ok {
  65. al.leases[leaseID] = t
  66. }
  67. al.rwLock.Unlock()
  68. }
  69. func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
  70. al.rwLock.RLock()
  71. rv, ok = al.leases[leaseID]
  72. al.rwLock.RUnlock()
  73. return rv, ok
  74. }
  75. func (al *atomicLeases) remove(leaseID int64) {
  76. al.rwLock.Lock()
  77. delete(al.leases, leaseID)
  78. al.rwLock.Unlock()
  79. }
  80. func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
  81. leasesCopy := make(map[int64]time.Time)
  82. al.rwLock.RLock()
  83. for k, v := range al.leases {
  84. leasesCopy[k] = v
  85. }
  86. al.rwLock.RUnlock()
  87. return leasesCopy
  88. }
  89. func (ls *leaseStresser) setupOnce() error {
  90. if ls.aliveLeases != nil {
  91. return nil
  92. }
  93. if ls.numLeases == 0 {
  94. panic("expect numLeases to be set")
  95. }
  96. if ls.keysPerLease == 0 {
  97. panic("expect keysPerLease to be set")
  98. }
  99. ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  100. return nil
  101. }
  102. func (ls *leaseStresser) Stress() error {
  103. plog.Infof("lease Stresser %v starting ...", ls.endpoint)
  104. if err := ls.setupOnce(); err != nil {
  105. return err
  106. }
  107. conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second))
  108. if err != nil {
  109. return fmt.Errorf("%v (%s)", err, ls.endpoint)
  110. }
  111. ls.conn = conn
  112. ls.kvc = pb.NewKVClient(conn)
  113. ls.lc = pb.NewLeaseClient(conn)
  114. ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  115. ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  116. ctx, cancel := context.WithCancel(context.Background())
  117. ls.cancel = cancel
  118. ls.ctx = ctx
  119. ls.runWg.Add(1)
  120. go ls.run()
  121. return nil
  122. }
  123. func (ls *leaseStresser) run() {
  124. defer ls.runWg.Done()
  125. ls.restartKeepAlives()
  126. for {
  127. // the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
  128. // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
  129. err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
  130. if err == context.Canceled {
  131. return
  132. }
  133. plog.Debugf("creating lease on %v", ls.endpoint)
  134. ls.createLeases()
  135. plog.Debugf("done creating lease on %v", ls.endpoint)
  136. plog.Debugf("dropping lease on %v", ls.endpoint)
  137. ls.randomlyDropLeases()
  138. plog.Debugf("done dropping lease on %v", ls.endpoint)
  139. }
  140. }
  141. func (ls *leaseStresser) restartKeepAlives() {
  142. for leaseID := range ls.aliveLeases.getLeasesMap() {
  143. ls.aliveWg.Add(1)
  144. go func(id int64) {
  145. ls.keepLeaseAlive(id)
  146. }(leaseID)
  147. }
  148. }
  149. func (ls *leaseStresser) createLeases() {
  150. ls.createAliveLeases()
  151. ls.createShortLivedLeases()
  152. }
  153. func (ls *leaseStresser) createAliveLeases() {
  154. neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
  155. var wg sync.WaitGroup
  156. for i := 0; i < neededLeases; i++ {
  157. wg.Add(1)
  158. go func() {
  159. defer wg.Done()
  160. leaseID, err := ls.createLeaseWithKeys(TTL)
  161. if err != nil {
  162. plog.Debugf("lease creation error: (%v)", err)
  163. return
  164. }
  165. ls.aliveLeases.add(leaseID, time.Now())
  166. // keep track of all the keep lease alive go routines
  167. ls.aliveWg.Add(1)
  168. go ls.keepLeaseAlive(leaseID)
  169. }()
  170. }
  171. wg.Wait()
  172. }
  173. func (ls *leaseStresser) createShortLivedLeases() {
  174. // one round of createLeases() might not create all the short lived leases we want due to falures.
  175. // thus, we want to create remaining short lived leases in the future round.
  176. neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
  177. var wg sync.WaitGroup
  178. for i := 0; i < neededLeases; i++ {
  179. wg.Add(1)
  180. go func() {
  181. defer wg.Done()
  182. leaseID, err := ls.createLeaseWithKeys(TTLShort)
  183. if err != nil {
  184. return
  185. }
  186. ls.shortLivedLeases.add(leaseID, time.Now())
  187. }()
  188. }
  189. wg.Wait()
  190. }
  191. func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
  192. leaseID, err := ls.createLease(ttl)
  193. if err != nil {
  194. plog.Debugf("lease creation error: (%v)", err)
  195. return -1, err
  196. }
  197. plog.Debugf("lease %v created ", leaseID)
  198. if err := ls.attachKeysWithLease(leaseID); err != nil {
  199. return -1, err
  200. }
  201. return leaseID, nil
  202. }
  203. func (ls *leaseStresser) randomlyDropLeases() {
  204. var wg sync.WaitGroup
  205. for l := range ls.aliveLeases.getLeasesMap() {
  206. wg.Add(1)
  207. go func(leaseID int64) {
  208. defer wg.Done()
  209. dropped, err := ls.randomlyDropLease(leaseID)
  210. // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
  211. // because we can't tell whether the lease is dropped or not.
  212. if err != nil {
  213. plog.Debugf("drop lease %v has failed error (%v)", leaseID, err)
  214. ls.aliveLeases.remove(leaseID)
  215. return
  216. }
  217. if !dropped {
  218. return
  219. }
  220. plog.Debugf("lease %v dropped", leaseID)
  221. ls.revokedLeases.add(leaseID, time.Now())
  222. ls.aliveLeases.remove(leaseID)
  223. }(l)
  224. }
  225. wg.Wait()
  226. }
  227. func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
  228. resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
  229. if err != nil {
  230. return -1, err
  231. }
  232. return resp.ID, nil
  233. }
  234. func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
  235. defer ls.aliveWg.Done()
  236. ctx, cancel := context.WithCancel(ls.ctx)
  237. stream, err := ls.lc.LeaseKeepAlive(ctx)
  238. defer func() { cancel() }()
  239. for {
  240. select {
  241. case <-time.After(500 * time.Millisecond):
  242. case <-ls.ctx.Done():
  243. plog.Debugf("keepLeaseAlive lease %v context canceled ", leaseID)
  244. // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
  245. // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
  246. // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
  247. // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
  248. // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
  249. renewTime, ok := ls.aliveLeases.read(leaseID)
  250. if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
  251. ls.aliveLeases.remove(leaseID)
  252. plog.Debugf("keepLeaseAlive lease %v has not been renewed. drop it.", leaseID)
  253. }
  254. return
  255. }
  256. if err != nil {
  257. plog.Debugf("keepLeaseAlive lease %v creates stream error: (%v)", leaseID, err)
  258. cancel()
  259. ctx, cancel = context.WithCancel(ls.ctx)
  260. stream, err = ls.lc.LeaseKeepAlive(ctx)
  261. continue
  262. }
  263. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
  264. plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
  265. if err != nil {
  266. plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err)
  267. continue
  268. }
  269. leaseRenewTime := time.Now()
  270. plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
  271. respRC, err := stream.Recv()
  272. if err != nil {
  273. plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err)
  274. continue
  275. }
  276. // lease expires after TTL become 0
  277. // don't send keepalive if the lease has expired
  278. if respRC.TTL <= 0 {
  279. plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID)
  280. ls.aliveLeases.remove(leaseID)
  281. return
  282. }
  283. // renew lease timestamp only if lease is present
  284. plog.Debugf("keepLeaseAlive renew lease %v", leaseID)
  285. ls.aliveLeases.update(leaseID, leaseRenewTime)
  286. }
  287. }
  288. // attachKeysWithLease function attaches keys to the lease.
  289. // the format of key is the concat of leaseID + '_' + '<order of key creation>'
  290. // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
  291. func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
  292. var txnPuts []*pb.RequestOp
  293. for j := 0; j < ls.keysPerLease; j++ {
  294. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
  295. Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
  296. txnPuts = append(txnPuts, txnput)
  297. }
  298. // keep retrying until lease is not found or ctx is being canceled
  299. for ls.ctx.Err() == nil {
  300. txn := &pb.TxnRequest{Success: txnPuts}
  301. _, err := ls.kvc.Txn(ls.ctx, txn)
  302. if err == nil {
  303. // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
  304. atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
  305. return nil
  306. }
  307. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  308. return err
  309. }
  310. }
  311. return ls.ctx.Err()
  312. }
  313. // randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
  314. // This creates a 50/50 percents chance of dropping a lease
  315. func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
  316. if rand.Intn(2) != 0 {
  317. return false, nil
  318. }
  319. // keep retrying until a lease is dropped or ctx is being canceled
  320. for ls.ctx.Err() == nil {
  321. _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
  322. if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  323. return true, nil
  324. }
  325. }
  326. plog.Debugf("randomlyDropLease error: (%v)", ls.ctx.Err())
  327. return false, ls.ctx.Err()
  328. }
  329. func (ls *leaseStresser) Pause() {
  330. ls.Close()
  331. }
  332. func (ls *leaseStresser) Close() {
  333. plog.Debugf("lease stresser %q is closing...", ls.endpoint)
  334. ls.cancel()
  335. ls.runWg.Wait()
  336. ls.aliveWg.Wait()
  337. ls.conn.Close()
  338. plog.Infof("lease stresser %q is closed", ls.endpoint)
  339. }
  340. func (ls *leaseStresser) ModifiedKeys() int64 {
  341. return atomic.LoadInt64(&ls.atomicModifiedKey)
  342. }
  343. func (ls *leaseStresser) Checker() Checker { return &leaseChecker{endpoint: ls.endpoint, ls: ls} }