|
|
@@ -30,7 +30,8 @@ import (
|
|
|
|
|
|
const (
|
|
|
// time to live for lease
|
|
|
- TTL = 120
|
|
|
+ TTL = 120
|
|
|
+ TTLShort = 2
|
|
|
// leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
|
|
|
leasesStressRoundPs = 1
|
|
|
)
|
|
|
@@ -56,8 +57,9 @@ type leaseStresser struct {
|
|
|
numLeases int
|
|
|
keysPerLease int
|
|
|
|
|
|
- aliveLeases *atomicLeases
|
|
|
- revokedLeases *atomicLeases
|
|
|
+ aliveLeases *atomicLeases
|
|
|
+ revokedLeases *atomicLeases
|
|
|
+ shortLivedLeases *atomicLeases
|
|
|
|
|
|
runWg sync.WaitGroup
|
|
|
aliveWg sync.WaitGroup
|
|
|
@@ -158,7 +160,7 @@ func (ls *leaseStresser) setupOnce() error {
|
|
|
ls.lc = pb.NewLeaseClient(conn)
|
|
|
|
|
|
ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
|
|
- ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -167,6 +169,8 @@ func (ls *leaseStresser) Stress() error {
|
|
|
if err := ls.setupOnce(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
|
|
+ ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
ls.cancel = cancel
|
|
|
@@ -203,24 +207,22 @@ func (ls *leaseStresser) restartKeepAlives() {
|
|
|
}
|
|
|
|
|
|
func (ls *leaseStresser) createLeases() {
|
|
|
+ ls.createAliveLeases()
|
|
|
+ ls.createShortLivedLeases()
|
|
|
+}
|
|
|
+
|
|
|
+func (ls *leaseStresser) createAliveLeases() {
|
|
|
neededLeases := ls.numLeases - len(ls.aliveLeases.getLeasesMap())
|
|
|
var wg sync.WaitGroup
|
|
|
for i := 0; i < neededLeases; i++ {
|
|
|
wg.Add(1)
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
- leaseID, err := ls.createLease()
|
|
|
+ leaseID, err := ls.createLeaseWithKeys(TTL)
|
|
|
if err != nil {
|
|
|
plog.Debugf("lease creation error: (%v)", err)
|
|
|
return
|
|
|
}
|
|
|
- plog.Debugf("lease %v created", leaseID)
|
|
|
- // if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
|
|
|
- // because invariant check on the lease will fail due to keys not found
|
|
|
- if err := ls.attachKeysWithLease(leaseID); err != nil {
|
|
|
- plog.Debugf("unable to attach keys to lease %d error (%v)", leaseID, err)
|
|
|
- return
|
|
|
- }
|
|
|
ls.aliveLeases.add(leaseID, time.Now())
|
|
|
// keep track of all the keep lease alive go routines
|
|
|
ls.aliveWg.Add(1)
|
|
|
@@ -230,6 +232,38 @@ func (ls *leaseStresser) createLeases() {
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
+func (ls *leaseStresser) createShortLivedLeases() {
|
|
|
+ // one round of createLeases() might not create all the short lived leases we want due to falures.
|
|
|
+ // thus, we want to create remaining short lived leases in the future round.
|
|
|
+ neededLeases := ls.numLeases - len(ls.shortLivedLeases.getLeasesMap())
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ for i := 0; i < neededLeases; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ leaseID, err := ls.createLeaseWithKeys(TTLShort)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ ls.shortLivedLeases.add(leaseID, time.Now())
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
+func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
|
|
|
+ leaseID, err := ls.createLease(ttl)
|
|
|
+ if err != nil {
|
|
|
+ plog.Debugf("lease creation error: (%v)", err)
|
|
|
+ return -1, err
|
|
|
+ }
|
|
|
+ plog.Debugf("lease %v created ", leaseID)
|
|
|
+ if err := ls.attachKeysWithLease(leaseID); err != nil {
|
|
|
+ return -1, err
|
|
|
+ }
|
|
|
+ return leaseID, nil
|
|
|
+}
|
|
|
+
|
|
|
func (ls *leaseStresser) randomlyDropLeases() {
|
|
|
var wg sync.WaitGroup
|
|
|
for l := range ls.aliveLeases.getLeasesMap() {
|
|
|
@@ -285,8 +319,8 @@ func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leas
|
|
|
return len(resp.Kvs) == 0, nil
|
|
|
}
|
|
|
|
|
|
-func (ls *leaseStresser) createLease() (int64, error) {
|
|
|
- resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: TTL})
|
|
|
+func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
|
|
|
+ resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
}
|
|
|
@@ -394,6 +428,9 @@ func (ls *leaseStresser) Cancel() {
|
|
|
ls.cancel()
|
|
|
ls.runWg.Wait()
|
|
|
ls.aliveWg.Wait()
|
|
|
+ // we sleep for TTLShort seconds to make sure leases in shortLivedLeases are expired
|
|
|
+ // leaseChecker will then verify that those leases are indeed expired
|
|
|
+ time.Sleep(TTLShort * time.Second)
|
|
|
plog.Infof("lease stresser %q is canceled", ls.endpoint)
|
|
|
}
|
|
|
|