浏览代码

functional-tester: add rate limiter to lease stresser

too many leases created can cause compaction to timeout. adding a rate limiter limits number of leases and attched keys.
fanmin shi 9 年之前
父节点
当前提交
d582fdcc1b
共有 1 个文件被更改,包括 25 次插入12 次删除
  1. 25 12
      tools/functional-tester/etcd-tester/lease_stresser.go

+ 25 - 12
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -24,11 +24,16 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
 )
 
-// time to live for lease
-const TTL = 30
+const (
+	// time to live for lease
+	TTL = 30
+	// leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
+	leasesStressRoundPs = 1
+)
 
 type leaseStressConfig struct {
 	numLeases    int
@@ -44,6 +49,8 @@ type leaseStresser struct {
 	lc       pb.LeaseClient
 	ctx      context.Context
 
+	rateLimiter *rate.Limiter
+
 	success      int
 	failure      int
 	numLeases    int
@@ -115,10 +122,13 @@ func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresse
 		}
 	case "default":
 		return func(mem *member) Stresser {
+			// limit lease stresser to run 1 round per second
+			l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs)
 			return &leaseStresser{
 				endpoint:     mem.grpcAddr(),
 				numLeases:    lsConfig.numLeases,
 				keysPerLease: lsConfig.keysPerLease,
+				rateLimiter:  l,
 			}
 		}
 	default:
@@ -170,13 +180,16 @@ func (ls *leaseStresser) Stress() error {
 func (ls *leaseStresser) run() {
 	defer ls.runWg.Done()
 	ls.restartKeepAlives()
-	for ls.ctx.Err() == nil {
-		plog.Debugf("creating lease on %v ", ls.endpoint)
+	for {
+		if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled {
+			return
+		}
+		plog.Debugf("creating lease on %v", ls.endpoint)
 		ls.createLeases()
-		plog.Debugf("done creating lease on %v ", ls.endpoint)
-		plog.Debugf("dropping lease on %v ", ls.endpoint)
+		plog.Debugf("done creating lease on %v", ls.endpoint)
+		plog.Debugf("dropping lease on %v", ls.endpoint)
 		ls.randomlyDropLeases()
-		plog.Debugf("done dropping lease on %v ", ls.endpoint)
+		plog.Debugf("done dropping lease on %v", ls.endpoint)
 	}
 }
 
@@ -201,7 +214,7 @@ func (ls *leaseStresser) createLeases() {
 				plog.Errorf("lease creation error: (%v)", err)
 				return
 			}
-			plog.Debugf("lease %v created ", leaseID)
+			plog.Debugf("lease %v created", leaseID)
 			// if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
 			// because invariant check on the lease will fail due to keys not found
 			if err := ls.attachKeysWithLease(leaseID); err != nil {
@@ -232,7 +245,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
 			if !dropped {
 				return
 			}
-			plog.Debugf("lease %v dropped ", leaseID)
+			plog.Debugf("lease %v dropped", leaseID)
 			ls.revokedLeases.add(leaseID, time.Now())
 			ls.aliveLeases.remove(leaseID)
 		}(l)
@@ -311,20 +324,20 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
 		err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
 		plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request", leaseID)
 		if err != nil {
-			plog.Debugf("keepLeaseAlive stream sends lease %v error (%v) ", leaseID, err)
+			plog.Debugf("keepLeaseAlive stream sends lease %v error (%v)", leaseID, err)
 			continue
 		}
 		leaseRenewTime := time.Now()
 		plog.Debugf("keepLeaseAlive stream sends lease %v keepalive request succeed", leaseID)
 		respRC, err := stream.Recv()
 		if err != nil {
-			plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v) ", leaseID, err)
+			plog.Debugf("keepLeaseAlive stream receives lease %v stream error (%v)", leaseID, err)
 			continue
 		}
 		// lease expires after TTL become 0
 		// don't send keepalive if the lease has expired
 		if respRC.TTL <= 0 {
-			plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0 ", leaseID)
+			plog.Debugf("keepLeaseAlive stream receives lease %v has TTL <= 0", leaseID)
 			ls.aliveLeases.remove(leaseID)
 			return
 		}