lease_stresser.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "sync"
  19. "time"
  20. "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "golang.org/x/net/context"
  24. "golang.org/x/time/rate"
  25. "google.golang.org/grpc"
  26. )
  27. const (
  28. // time to live for lease
  29. TTL = 30
  30. // leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
  31. leasesStressRoundPs = 1
  32. )
  33. type leaseStressConfig struct {
  34. numLeases int
  35. keysPerLease int
  36. qps int
  37. }
  38. type leaseStresser struct {
  39. endpoint string
  40. cancel func()
  41. conn *grpc.ClientConn
  42. kvc pb.KVClient
  43. lc pb.LeaseClient
  44. ctx context.Context
  45. rateLimiter *rate.Limiter
  46. success int
  47. failure int
  48. numLeases int
  49. keysPerLease int
  50. aliveLeases *atomicLeases
  51. revokedLeases *atomicLeases
  52. runWg sync.WaitGroup
  53. aliveWg sync.WaitGroup
  54. }
  55. type atomicLeases struct {
  56. // rwLock is used to protect read/write access of leases map
  57. // which are accessed and modified by different go routines.
  58. rwLock sync.RWMutex
  59. leases map[int64]time.Time
  60. }
  61. func (al *atomicLeases) add(leaseID int64, t time.Time) {
  62. al.rwLock.Lock()
  63. al.leases[leaseID] = t
  64. al.rwLock.Unlock()
  65. }
  66. func (al *atomicLeases) update(leaseID int64, t time.Time) {
  67. al.rwLock.Lock()
  68. _, ok := al.leases[leaseID]
  69. if ok {
  70. al.leases[leaseID] = t
  71. }
  72. al.rwLock.Unlock()
  73. }
  74. func (al *atomicLeases) read(leaseID int64) (rv time.Time, ok bool) {
  75. al.rwLock.RLock()
  76. rv, ok = al.leases[leaseID]
  77. al.rwLock.RUnlock()
  78. return rv, ok
  79. }
  80. func (al *atomicLeases) remove(leaseID int64) {
  81. al.rwLock.Lock()
  82. delete(al.leases, leaseID)
  83. al.rwLock.Unlock()
  84. }
  85. func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
  86. leasesCopy := make(map[int64]time.Time)
  87. al.rwLock.RLock()
  88. for k, v := range al.leases {
  89. leasesCopy[k] = v
  90. }
  91. al.rwLock.RUnlock()
  92. return leasesCopy
  93. }
  94. type leaseStresserBuilder func(m *member) Stresser
  95. func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresserBuilder {
  96. // TODO: probably need to combine newLeaseStresserBuilder with newStresserBuilder to have a unified stresser builder.
  97. switch s {
  98. case "nop":
  99. return func(*member) Stresser {
  100. return &nopStresser{
  101. start: time.Now(),
  102. qps: lsConfig.qps,
  103. }
  104. }
  105. case "default":
  106. return func(mem *member) Stresser {
  107. // limit lease stresser to run 1 round per second
  108. l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs)
  109. return &leaseStresser{
  110. endpoint: mem.grpcAddr(),
  111. numLeases: lsConfig.numLeases,
  112. keysPerLease: lsConfig.keysPerLease,
  113. rateLimiter: l,
  114. }
  115. }
  116. default:
  117. plog.Panicf("unknown stresser type: %s\n", s)
  118. }
  119. // never reach here
  120. return nil
  121. }
  122. func (ls *leaseStresser) setupOnce() error {
  123. if ls.aliveLeases != nil {
  124. return nil
  125. }
  126. if ls.numLeases == 0 {
  127. panic("expect numLeases to be set")
  128. }
  129. if ls.keysPerLease == 0 {
  130. panic("expect keysPerLease to be set")
  131. }
  132. conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure())
  133. if err != nil {
  134. return fmt.Errorf("%v (%s)", err, ls.endpoint)
  135. }
  136. ls.conn = conn
  137. ls.kvc = pb.NewKVClient(conn)
  138. ls.lc = pb.NewLeaseClient(conn)
  139. ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  140. ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
  141. return nil
  142. }
  143. func (ls *leaseStresser) Stress() error {
  144. plog.Infof("lease Stresser %v starting ...", ls.endpoint)
  145. if err := ls.setupOnce(); err != nil {
  146. return err
  147. }
  148. ctx, cancel := context.WithCancel(context.Background())
  149. ls.cancel = cancel
  150. ls.ctx = ctx
  151. ls.runWg.Add(1)
  152. go ls.run()
  153. return nil
  154. }
  155. func (ls *leaseStresser) run() {
  156. defer ls.runWg.Done()
  157. ls.restartKeepAlives()
  158. for {
  159. if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled {
  160. return
  161. }
  162. plog.Debugf("creating lease on %v", ls.endpoint)
  163. ls.createLeases()
  164. plog.Debugf("done creating lease on %v", ls.endpoint)
  165. plog.Debugf("dropping lease on %v", ls.endpoint)
  166. ls.randomlyDropLeases()
  167. plog.Debugf("done dropping lease on %v", ls.endpoint)
  168. }
  169. }
  170. func (ls *leaseStresser) restartKeepAlives() {
  171. for leaseID := range ls.aliveLeases.getLeasesMap() {
  172. ls.aliveWg.Add(1)
  173. go func(id int64) {
  174. ls.keepLeaseAlive(id)
  175. }(leaseID)
  176. }
  177. }
  178. func (ls *leaseStresser) createLeases() {
  179. neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
  180. var wg sync.WaitGroup
  181. for i := 0; i < neededLeases; i++ {
  182. wg.Add(1)
  183. go func() {
  184. defer wg.Done()
  185. leaseID, err := ls.createLease()
  186. if err != nil {
  187. plog.Errorf("lease creation error: (%v)", err)
  188. return
  189. }
  190. plog.Debugf("lease %v created", leaseID)
  191. // if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
  192. // because invariant check on the lease will fail due to keys not found
  193. if err := ls.attachKeysWithLease(leaseID); err != nil {
  194. return
  195. }
  196. ls.aliveLeases.add(leaseID, time.Now())
  197. // keep track of all the keep lease alive go routines
  198. ls.aliveWg.Add(1)
  199. go ls.keepLeaseAlive(leaseID)
  200. }()
  201. }
  202. wg.Wait()
  203. }
  204. func (ls *leaseStresser) randomlyDropLeases() {
  205. var wg sync.WaitGroup
  206. for l := range ls.aliveLeases.getLeasesMap() {
  207. wg.Add(1)
  208. go func(leaseID int64) {
  209. defer wg.Done()
  210. dropped, err := ls.randomlyDropLease(leaseID)
  211. // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
  212. // becasue we can't tell whether the lease is dropped or not.
  213. if err != nil {
  214. ls.aliveLeases.remove(leaseID)
  215. return
  216. }
  217. if !dropped {
  218. return
  219. }
  220. plog.Debugf("lease %v dropped", leaseID)
  221. ls.revokedLeases.add(leaseID, time.Now())
  222. ls.aliveLeases.remove(leaseID)
  223. }(l)
  224. }
  225. wg.Wait()
  226. }
  227. func (ls *leaseStresser) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
  228. ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
  229. return ls.lc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
  230. }
  231. func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  232. resp, err := ls.getLeaseByID(ctx, leaseID)
  233. plog.Debugf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
  234. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  235. return true, nil
  236. }
  237. return false, err
  238. }
  239. // The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
  240. // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
  241. // determines whether the attached keys for a given leaseID has been deleted or not
  242. func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
  243. // plog.Infof("retriving keys attached to lease %v", leaseID)
  244. resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{
  245. Key: []byte(fmt.Sprintf("%d", leaseID)),
  246. RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
  247. }, grpc.FailFast(false))
  248. plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err)
  249. if err != nil {
  250. plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err)
  251. return false, err
  252. }
  253. return len(resp.Kvs) == 0, nil
  254. }
  255. func (ls *leaseStresser) createLease() (int64, error) {
  256. resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: TTL})
  257. if err != nil {
  258. return -1, err
  259. }
  260. return resp.ID, nil
  261. }
  262. func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
  263. defer ls.aliveWg.Done()
  264. ctx, cancel := context.WithCancel(ls.ctx)
  265. stream, err := ls.lc.LeaseKeepAlive(ctx)
  266. for {
  267. select {
  268. case <-time.After(500 * time.Millisecond):
  269. case <-ls.ctx.Done():
  270. plog.Debugf("keepLeaseAlive lease %v context canceled ", leaseID)
  271. // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
  272. // this scenerio is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
  273. // to circumvent that scenerio, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
  274. // if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
  275. // if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
  276. renewTime, ok := ls.aliveLeases.read(leaseID)
  277. if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
  278. ls.aliveLeases.remove(leaseID)
  279. plog.Debugf("keepLeaseAlive lease %v has not been renewed. drop it.", leaseID)
  280. }
  281. return
  282. }
  283. if err != nil {
  284. plog.Debugf("keepLeaseAlive lease %v creates stream error: (%v)", leaseID, err)
  285. cancel()
  286. ctx, cancel = context.WithCancel(ls.ctx)
  287. stream, err = ls.lc.LeaseKeepAlive(ctx)
  288. continue
  289. }
  290. err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
  291. plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
  292. if err != nil {
  293. plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err)
  294. continue
  295. }
  296. leaseRenewTime := time.Now()
  297. plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
  298. respRC, err := stream.Recv()
  299. if err != nil {
  300. plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err)
  301. continue
  302. }
  303. // lease expires after TTL become 0
  304. // don't send keepalive if the lease has expired
  305. if respRC.TTL <= 0 {
  306. plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID)
  307. ls.aliveLeases.remove(leaseID)
  308. return
  309. }
  310. // renew lease timestamp only if lease is present
  311. plog.Debugf("keepLeaseAlive renew lease %v", leaseID)
  312. ls.aliveLeases.update(leaseID, leaseRenewTime)
  313. }
  314. }
  315. // attachKeysWithLease function attaches keys to the lease.
  316. // the format of key is the concat of leaseID + '_' + '<order of key creation>'
  317. // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
  318. func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
  319. var txnPuts []*pb.RequestOp
  320. for j := 0; j < ls.keysPerLease; j++ {
  321. txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
  322. Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
  323. txnPuts = append(txnPuts, txnput)
  324. }
  325. // keep retrying until lease is not found or ctx is being canceled
  326. for ls.ctx.Err() == nil {
  327. txn := &pb.TxnRequest{Success: txnPuts}
  328. _, err := ls.kvc.Txn(ls.ctx, txn)
  329. if err == nil {
  330. return nil
  331. }
  332. if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  333. return err
  334. }
  335. }
  336. return ls.ctx.Err()
  337. }
  338. // randomlyDropLease drops the lease only when the rand.Int(2) returns 1.
  339. // This creates a 50/50 percents chance of dropping a lease
  340. func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
  341. if rand.Intn(2) != 0 {
  342. return false, nil
  343. }
  344. // keep retrying until a lease is dropped or ctx is being canceled
  345. for ls.ctx.Err() == nil {
  346. _, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
  347. if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
  348. return true, nil
  349. }
  350. }
  351. plog.Debugf("randomlyDropLease error: (%v)", ls.ctx.Err())
  352. return false, ls.ctx.Err()
  353. }
  354. func (ls *leaseStresser) Cancel() {
  355. plog.Debugf("lease stresser %q is canceling...", ls.endpoint)
  356. ls.cancel()
  357. ls.runWg.Wait()
  358. ls.aliveWg.Wait()
  359. plog.Infof("lease stresser %q is canceled", ls.endpoint)
  360. }
  361. func (ls *leaseStresser) Report() (int, int) {
  362. return ls.success, ls.failure
  363. }