|
|
@@ -24,11 +24,16 @@ import (
|
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
|
"golang.org/x/net/context"
|
|
|
+ "golang.org/x/time/rate"
|
|
|
"google.golang.org/grpc"
|
|
|
)
|
|
|
|
|
|
-// time to live for lease
|
|
|
-const TTL = 30
|
|
|
+const (
|
|
|
+ // time to live for lease
|
|
|
+ TTL = 30
|
|
|
+ // leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
|
|
|
+ leasesStressRoundPs = 1
|
|
|
+)
|
|
|
|
|
|
type leaseStressConfig struct {
|
|
|
numLeases int
|
|
|
@@ -44,6 +49,8 @@ type leaseStresser struct {
|
|
|
lc pb.LeaseClient
|
|
|
ctx context.Context
|
|
|
|
|
|
+ rateLimiter *rate.Limiter
|
|
|
+
|
|
|
success int
|
|
|
failure int
|
|
|
numLeases int
|
|
|
@@ -115,10 +122,13 @@ func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresse
|
|
|
}
|
|
|
case "default":
|
|
|
return func(mem *member) Stresser {
|
|
|
+ // limit lease stresser to run 1 round per second
|
|
|
+ l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs)
|
|
|
return &leaseStresser{
|
|
|
endpoint: mem.grpcAddr(),
|
|
|
numLeases: lsConfig.numLeases,
|
|
|
keysPerLease: lsConfig.keysPerLease,
|
|
|
+ rateLimiter: l,
|
|
|
}
|
|
|
}
|
|
|
default:
|
|
|
@@ -170,13 +180,16 @@ func (ls *leaseStresser) Stress() error {
|
|
|
func (ls *leaseStresser) run() {
|
|
|
defer ls.runWg.Done()
|
|
|
ls.restartKeepAlives()
|
|
|
- for ls.ctx.Err() == nil {
|
|
|
- plog.Debugf("creating lease on %v ", ls.endpoint)
|
|
|
+ for {
|
|
|
+ if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ plog.Debugf("creating lease on %v", ls.endpoint)
|
|
|
ls.createLeases()
|
|
|
- plog.Debugf("done creating lease on %v ", ls.endpoint)
|
|
|
- plog.Debugf("dropping lease on %v ", ls.endpoint)
|
|
|
+ plog.Debugf("done creating lease on %v", ls.endpoint)
|
|
|
+ plog.Debugf("dropping lease on %v", ls.endpoint)
|
|
|
ls.randomlyDropLeases()
|
|
|
- plog.Debugf("done dropping lease on %v ", ls.endpoint)
|
|
|
+ plog.Debugf("done dropping lease on %v", ls.endpoint)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -201,7 +214,7 @@ func (ls *leaseStresser) createLeases() {
|
|
|
plog.Errorf("lease creation error: (%v)", err)
|
|
|
return
|
|
|
}
|
|
|
- plog.Debugf("lease %v created ", leaseID)
|
|
|
+ plog.Debugf("lease %v created", leaseID)
|
|
|
// if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
|
|
|
// because invariant check on the lease will fail due to keys not found
|
|
|
if err := ls.attachKeysWithLease(leaseID); err != nil {
|
|
|
@@ -232,7 +245,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
|
|
|
if !dropped {
|
|
|
return
|
|
|
}
|
|
|
- plog.Debugf("lease %v dropped ", leaseID)
|
|
|
+ plog.Debugf("lease %v dropped", leaseID)
|
|
|
ls.revokedLeases.add(leaseID, time.Now())
|
|
|
ls.aliveLeases.remove(leaseID)
|
|
|
}(l)
|
|
|
@@ -311,20 +324,20 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
|
|
|
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
|
|
|
plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
|
|
|
if err != nil {
|
|
|
- plog.Debugf("keepLeaseAlive stream sends lease %v error (%v) ", leaseID, err)
|
|
|
+ plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err)
|
|
|
continue
|
|
|
}
|
|
|
leaseRenewTime := time.Now()
|
|
|
plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
|
|
|
respRC, err := stream.Recv()
|
|
|
if err != nil {
|
|
|
- plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v) ", leaseID, err)
|
|
|
+ plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err)
|
|
|
continue
|
|
|
}
|
|
|
// lease expires after TTL become 0
|
|
|
// don't send keepalive if the lease has expired
|
|
|
if respRC.TTL <= 0 {
|
|
|
- plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0 ", leaseID)
|
|
|
+ plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID)
|
|
|
ls.aliveLeases.remove(leaseID)
|
|
|
return
|
|
|
}
|