Browse Source

etcd-tester: refactor stresser/checker organization

The checkers and stressers should be composable without special cases; this
patch tries to address that while refactoring out some old cruft.

Namely,
* Single stresser/checker for a tester; built from composition
* Composite stresser via comma-separated list of stressers
* Split stressers into separate files
* Removed v2 only flags and special cases
* Rate limiter shared among key stresser and leases stresser
* Composite checker is now concurrent
* Stresser can return a Checker to check its invariants
* Each lease checker only operates on a single lease stresser
Anthony Romano 9 years ago
parent
commit
308f2a1695

+ 21 - 64
tools/functional-tester/etcd-tester/checks.go

@@ -17,7 +17,6 @@ package main
 import (
 	"fmt"
 	"reflect"
-	"strings"
 	"time"
 
 	"golang.org/x/net/context"
@@ -133,76 +132,31 @@ func (hc *hashChecker) Check() error {
 	return hc.checkRevAndHashes()
 }
 
-type leaseChecker struct {
-	leaseStressers []Stresser
-}
-
-func newLeaseChecker(leaseStressers []Stresser) Checker { return &leaseChecker{leaseStressers} }
+type leaseChecker struct{ ls *leaseStresser }
 
 func (lc *leaseChecker) Check() error {
-	plog.Info("lease stresser invariant check...")
-	errc := make(chan error)
-	for _, ls := range lc.leaseStressers {
-		go func(s Stresser) { errc <- lc.checkInvariant(s) }(ls)
-	}
-	var errs []error
-	for i := 0; i < len(lc.leaseStressers); i++ {
-		if err := <-errc; err != nil {
-			errs = append(errs, err)
-		}
-	}
-
-	if len(errs) == 0 {
-		return nil
-	}
-	return fmt.Errorf("lease stresser encounters error: (%v)", fromErrsToString(errs))
-}
-
-func fromErrsToString(errs []error) string {
-	stringArr := make([]string, len(errs))
-	for i, err := range errs {
-		stringArr[i] = err.Error()
-	}
-	return strings.Join(stringArr, ",")
-}
-
-func (lc *leaseChecker) checkInvariant(lStresser Stresser) error {
-	ls := lStresser.(*leaseStresser)
-	if err := checkLeasesExpired(ls); err != nil {
+	plog.Infof("checking revoked leases %v", lc.ls.revokedLeases.leases)
+	if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
 		return err
 	}
-	if err := checkLeasesAlive(ls); err != nil {
+	plog.Infof("checking alive leases %v", lc.ls.aliveLeases.leases)
+	if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
 		return err
 	}
-	return checkShortLivedLeases(ls)
-}
-
-func checkLeasesExpired(ls *leaseStresser) error {
-	plog.Infof("revoked leases %v", ls.revokedLeases.getLeasesMap())
-	return checkLeases(true, ls, ls.revokedLeases.getLeasesMap())
-}
-
-func checkLeasesAlive(ls *leaseStresser) error {
-	plog.Infof("alive leases %v", ls.aliveLeases.getLeasesMap())
-	return checkLeases(false, ls, ls.aliveLeases.getLeasesMap())
-}
-
-// checkShortLivedLeases() verifies that the short lived leases are indeed being deleted.
-func checkShortLivedLeases(ls *leaseStresser) error {
-	plog.Infof("short lived leases %v", ls.shortLivedLeases.getLeasesMap())
-	return checkLeases(true, ls, ls.shortLivedLeases.getLeasesMap())
+	plog.Infof("checking short lived leases %v", lc.ls.shortLivedLeases.leases)
+	return lc.check(true, lc.ls.shortLivedLeases.leases)
 }
 
-func checkLeases(expired bool, ls *leaseStresser, leases map[int64]time.Time) error {
+func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
 	ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
 	defer cancel()
 	for leaseID := range leases {
-		keysExpired, err := ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
+		keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
 		if err != nil {
 			plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
 			return err
 		}
-		leaseExpired, err := ls.hasLeaseExpired(ctx, leaseID)
+		leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID)
 		if err != nil {
 			plog.Errorf("hasLeaseExpired error: (%v)", err)
 			return err
@@ -217,22 +171,25 @@ func checkLeases(expired bool, ls *leaseStresser, leases map[int64]time.Time) er
 	return nil
 }
 
-type compositeChecker struct {
-	checkers []Checker
-}
+// compositeChecker implements a checker that runs a slice of Checkers concurrently.
+type compositeChecker struct{ checkers []Checker }
 
 func newCompositeChecker(checkers []Checker) Checker {
 	return &compositeChecker{checkers}
 }
 
 func (cchecker *compositeChecker) Check() error {
-	for _, checker := range cchecker.checkers {
-		if err := checker.Check(); err != nil {
-			return err
+	errc := make(chan error)
+	for _, c := range cchecker.checkers {
+		go func(chk Checker) { errc <- chk.Check() }(c)
+	}
+	var errs []error
+	for range cchecker.checkers {
+		if err := <-errc; err != nil {
+			errs = append(errs, err)
 		}
 	}
-
-	return nil
+	return errsToError(errs)
 }
 
 type noChecker struct{}

+ 18 - 19
tools/functional-tester/etcd-tester/cluster.go

@@ -40,7 +40,6 @@ type agentConfig struct {
 
 type cluster struct {
 	agents  []agentConfig
-	v2Only  bool // to be deprecated
 	Size    int
 	Members []*member
 }
@@ -105,13 +104,9 @@ func (c *cluster) WaitHealth() error {
 	// TODO: set it to a reasonable value. It is set that high because
 	// follower may use long time to catch up the leader when reboot under
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
-	healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
-	if c.v2Only {
-		healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
-	}
 	for i := 0; i < 60; i++ {
 		for _, m := range c.Members {
-			if err = healthFunc(m); err != nil {
+			if err = m.SetHealthKeyV3(); err != nil {
 				break
 			}
 		}
@@ -126,9 +121,6 @@ func (c *cluster) WaitHealth() error {
 
 // GetLeader returns the index of leader and error if any.
 func (c *cluster) GetLeader() (int, error) {
-	if c.v2Only {
-		return 0, nil
-	}
 	for i, m := range c.Members {
 		isLeader, err := m.IsLeader()
 		if isLeader || err != nil {
@@ -172,20 +164,27 @@ func (c *cluster) Status() ClusterStatus {
 	return cs
 }
 
+// maxRev returns the maximum revision found on the cluster.
 func (c *cluster) maxRev() (rev int64, err error) {
-	for _, m := range c.Members {
-		curRev, _, curErr := m.RevHash()
-		if curErr != nil {
-			err = curErr
+	ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
+	defer cancel()
+	revc, errc := make(chan int64, len(c.Members)), make(chan error, len(c.Members))
+	for i := range c.Members {
+		go func(m *member) {
+			mrev, merr := m.Rev(ctx)
+			revc <- mrev
+			errc <- merr
+		}(c.Members[i])
+	}
+	for i := 0; i < len(c.Members); i++ {
+		if merr := <-errc; merr != nil {
+			err = merr
 		}
-		if curRev > rev {
-			rev = curRev
+		if mrev := <-revc; mrev > rev {
+			rev = mrev
 		}
 	}
-	if rev == 0 {
-		return 0, err
-	}
-	return rev, nil
+	return rev, err
 }
 
 func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {

+ 6 - 4
tools/functional-tester/etcd-tester/failure.go

@@ -139,8 +139,12 @@ func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
 	if c.Size < 3 {
 		return nil
 	}
+	// maxRev may fail since failure just injected, retry if failed.
 	startRev, err := c.maxRev()
-	if err != nil {
+	for i := 0; i < 10 && startRev == 0; i++ {
+		startRev, err = c.maxRev()
+	}
+	if startRev == 0 {
 		return err
 	}
 	lastRev := startRev
@@ -148,9 +152,7 @@ func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
 	// Give it 3-times time to create a new snapshot.
 	retry := snapshotCount / 1000 * 3
 	for j := 0; j < retry; j++ {
-		if lastRev, err = c.maxRev(); err != nil {
-			return err
-		}
+		lastRev, err = c.maxRev()
 		// If the number of proposals committed is bigger than snapshot count,
 		// a new snapshot should have been created.
 		if lastRev-startRev > snapshotCount {

+ 249 - 0
tools/functional-tester/etcd-tester/key_stresser.go

@@ -0,0 +1,249 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"sync"
+	"time"
+
+	"golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package
+	"golang.org/x/time/rate"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/transport"
+
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type keyStresser struct {
+	Endpoint string
+
+	keyLargeSize   int
+	keySize        int
+	keySuffixRange int
+
+	N int
+
+	rateLimiter *rate.Limiter
+
+	mu sync.Mutex
+	wg sync.WaitGroup
+
+	cancel func()
+	conn   *grpc.ClientConn
+
+	success int
+	failure int
+
+	stressTable *stressTable
+}
+
+func (s *keyStresser) Stress() error {
+	// TODO: add backoff option
+	conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
+	if err != nil {
+		return fmt.Errorf("%v (%s)", err, s.Endpoint)
+	}
+	ctx, cancel := context.WithCancel(context.Background())
+
+	s.wg.Add(s.N)
+	s.conn = conn
+	s.cancel = cancel
+
+	kvc := pb.NewKVClient(conn)
+
+	var stressEntries = []stressEntry{
+		{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
+		{
+			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
+			f:      newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
+		},
+		{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
+		{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
+	}
+	s.stressTable = createStressTable(stressEntries)
+
+	for i := 0; i < s.N; i++ {
+		go s.run(ctx)
+	}
+
+	plog.Infof("keyStresser %q is started", s.Endpoint)
+	return nil
+}
+
+func (s *keyStresser) run(ctx context.Context) {
+	defer s.wg.Done()
+
+	for {
+		if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
+			return
+		}
+
+		// TODO: 10-second is enough timeout to cover leader failure
+		// and immediate leader election. Find out what other cases this
+		// could be timed out.
+		sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
+		err := s.stressTable.choose()(sctx)
+		scancel()
+		if err == nil {
+			s.mu.Lock()
+			s.success++
+			s.mu.Unlock()
+			continue
+		}
+
+		s.mu.Lock()
+		s.failure++
+		s.mu.Unlock()
+		switch grpc.ErrorDesc(err) {
+		case context.DeadlineExceeded.Error():
+			// This retries when request is triggered at the same time as
+			// leader failure. When we terminate the leader, the request to
+			// that leader cannot be processed, and times out. Also requests
+			// to followers cannot be forwarded to the old leader, so timing out
+			// as well. We want to keep stressing until the cluster elects a
+			// new leader and start processing requests again.
+		case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
+			// This retries when request is triggered at the same time as
+			// leader failure and follower nodes receive time out errors
+			// from losing their leader. Followers should retry to connect
+			// to the new leader.
+		case etcdserver.ErrStopped.Error():
+			// one of the etcd nodes stopped from failure injection
+		case transport.ErrConnClosing.Desc:
+			// server closed the transport (failure injected node)
+		case rpctypes.ErrNotCapable.Error():
+			// capability check has not been done (in the beginning)
+		case rpctypes.ErrTooManyRequests.Error():
+			// hitting the recovering member.
+		case context.Canceled.Error():
+			// from stresser.Cancel method:
+			return
+		case grpc.ErrClientConnClosing.Error():
+			// from stresser.Cancel method:
+			return
+		default:
+			su, fa := s.Report()
+			plog.Errorf("keyStresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
+			return
+		}
+	}
+}
+
+func (s *keyStresser) Cancel() {
+	s.cancel()
+	s.conn.Close()
+	s.wg.Wait()
+	plog.Infof("keyStresser %q is canceled", s.Endpoint)
+}
+
+func (s *keyStresser) Report() (int, int) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.success, s.failure
+}
+
+func (s *keyStresser) Checker() Checker { return nil }
+
+type stressFunc func(ctx context.Context) error
+
+type stressEntry struct {
+	weight float32
+	f      stressFunc
+}
+
+type stressTable struct {
+	entries    []stressEntry
+	sumWeights float32
+}
+
+func createStressTable(entries []stressEntry) *stressTable {
+	st := stressTable{entries: entries}
+	for _, entry := range st.entries {
+		st.sumWeights += entry.weight
+	}
+	return &st
+}
+
+func (st *stressTable) choose() stressFunc {
+	v := rand.Float32() * st.sumWeights
+	var sum float32
+	var idx int
+	for i := range st.entries {
+		sum += st.entries[i].weight
+		if sum >= v {
+			idx = i
+			break
+		}
+	}
+	return st.entries[idx].f
+}
+
+func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
+	return func(ctx context.Context) error {
+		_, err := kvc.Put(ctx, &pb.PutRequest{
+			Key:   []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
+			Value: randBytes(keySize),
+		}, grpc.FailFast(false))
+		return err
+	}
+}
+
+func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
+	return func(ctx context.Context) error {
+		_, err := kvc.Range(ctx, &pb.RangeRequest{
+			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
+		}, grpc.FailFast(false))
+		return err
+	}
+}
+
+func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+	return func(ctx context.Context) error {
+		start := rand.Intn(keySuffixRange)
+		end := start + 500
+		_, err := kvc.Range(ctx, &pb.RangeRequest{
+			Key:      []byte(fmt.Sprintf("foo%016x", start)),
+			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
+		}, grpc.FailFast(false))
+		return err
+	}
+}
+
+func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
+	return func(ctx context.Context) error {
+		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
+			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
+		}, grpc.FailFast(false))
+		return err
+	}
+}
+
+func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+	return func(ctx context.Context) error {
+		start := rand.Intn(keySuffixRange)
+		end := start + 500
+		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
+			Key:      []byte(fmt.Sprintf("foo%016x", start)),
+			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
+		}, grpc.FailFast(false))
+		return err
+	}
+}

+ 4 - 39
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -32,16 +32,8 @@ const (
 	// time to live for lease
 	TTL      = 120
 	TTLShort = 2
-	// leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
-	leasesStressRoundPs = 1
 )
 
-type leaseStressConfig struct {
-	numLeases    int
-	keysPerLease int
-	qps          int
-}
-
 type leaseStresser struct {
 	endpoint string
 	cancel   func()
@@ -110,36 +102,6 @@ func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
 	return leasesCopy
 }
 
-type leaseStresserBuilder func(m *member) Stresser
-
-func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresserBuilder {
-	// TODO: probably need to combine newLeaseStresserBuilder with newStresserBuilder to have a unified stresser builder.
-	switch s {
-	case "nop":
-		return func(*member) Stresser {
-			return &nopStresser{
-				start: time.Now(),
-				qps:   lsConfig.qps,
-			}
-		}
-	case "default":
-		return func(mem *member) Stresser {
-			// limit lease stresser to run 1 round per second
-			l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs)
-			return &leaseStresser{
-				endpoint:     mem.grpcAddr(),
-				numLeases:    lsConfig.numLeases,
-				keysPerLease: lsConfig.keysPerLease,
-				rateLimiter:  l,
-			}
-		}
-	default:
-		plog.Panicf("unknown stresser type: %s\n", s)
-	}
-	// never reach here
-	return nil
-}
-
 func (ls *leaseStresser) setupOnce() error {
 	if ls.aliveLeases != nil {
 		return nil
@@ -185,7 +147,8 @@ func (ls *leaseStresser) run() {
 	defer ls.runWg.Done()
 	ls.restartKeepAlives()
 	for {
-		if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled {
+		err := ls.rateLimiter.WaitN(ls.ctx, ls.numLeases*ls.keysPerLease)
+		if err == context.Canceled {
 			return
 		}
 		plog.Debugf("creating lease on %v", ls.endpoint)
@@ -437,3 +400,5 @@ func (ls *leaseStresser) Cancel() {
 func (ls *leaseStresser) Report() (int, int) {
 	return ls.success, ls.failure
 }
+
+func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} }

+ 15 - 21
tools/functional-tester/etcd-tester/main.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/coreos/pkg/capnslog"
 	"github.com/prometheus/client_golang/prometheus"
+	"golang.org/x/time/rate"
 )
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester")
@@ -47,8 +48,7 @@ func main() {
 	stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
 	schedCases := flag.String("schedule-cases", "", "test case schedule")
 	consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
-	isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
-	stresserType := flag.String("stresser", "default", "specify stresser (\"default\" or \"nop\").")
+	stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop).")
 	failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").")
 	externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector")
 	flag.Parse()
@@ -67,11 +67,7 @@ func main() {
 		agents[i].datadir = *datadir
 	}
 
-	c := &cluster{
-		agents: agents,
-		v2Only: *isV2Only,
-	}
-
+	c := &cluster{agents: agents}
 	if err := c.bootstrap(); err != nil {
 		plog.Fatal(err)
 	}
@@ -113,25 +109,23 @@ func main() {
 		}
 	}
 
-	sConfig := &stressConfig{
-		qps:            *stressQPS,
+	scfg := stressConfig{
+		rateLimiter:    rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
 		keyLargeSize:   int(*stressKeyLargeSize),
 		keySize:        int(*stressKeySize),
 		keySuffixRange: int(*stressKeySuffixRange),
-		v2:             *isV2Only,
-	}
-	lsConfig := &leaseStressConfig{
-		numLeases:    10,
-		keysPerLease: 10,
-		qps:          *stressQPS,
+		numLeases:      10,
+		keysPerLease:   10,
 	}
+
 	t := &tester{
-		failures:             schedule,
-		cluster:              c,
-		limit:                *limit,
-		stressBuilder:        newStressBuilder(*stresserType, sConfig),
-		leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig),
-		consistencyCheck:     *consistencyCheck,
+		failures: schedule,
+		cluster:  c,
+		limit:    *limit,
+
+		scfg:         scfg,
+		stresserType: *stresserType,
+		doChecks:     *consistencyCheck,
 	}
 
 	sh := statusHandler{status: &t.status}

+ 13 - 14
tools/functional-tester/etcd-tester/member.go

@@ -23,7 +23,6 @@ import (
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 
-	clientv2 "github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
@@ -105,6 +104,19 @@ func (m *member) RevHash() (int64, int64, error) {
 	return resp.Header.Revision, int64(resp.Hash), nil
 }
 
+func (m *member) Rev(ctx context.Context) (int64, error) {
+	cli, err := m.newClientV3()
+	if err != nil {
+		return 0, err
+	}
+	defer cli.Close()
+	resp, err := cli.Status(ctx, m.ClientURL)
+	if err != nil {
+		return 0, err
+	}
+	return resp.Header.Revision, nil
+}
+
 func (m *member) IsLeader() (bool, error) {
 	cli, err := m.newClientV3()
 	if err != nil {
@@ -134,19 +146,6 @@ func (m *member) SetHealthKeyV3() error {
 	return nil
 }
 
-func (m *member) SetHealthKeyV2() error {
-	cfg := clientv2.Config{Endpoints: []string{m.ClientURL}}
-	c, err := clientv2.New(cfg)
-	if err != nil {
-		return err
-	}
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-	kapi := clientv2.NewKeysAPI(c)
-	_, err = kapi.Set(ctx, "health", "good", nil)
-	cancel()
-	return err
-}
-
 func (m *member) newClientV3() (*clientv3.Client, error) {
 	return clientv3.New(clientv3.Config{
 		Endpoints:   []string{m.ClientURL},

+ 89 - 365
tools/functional-tester/etcd-tester/stresser.go

@@ -15,113 +15,15 @@
 package main
 
 import (
-	"fmt"
-	"math/rand"
-	"net"
-	"net/http"
+	"strings"
 	"sync"
 	"time"
 
-	clientV2 "github.com/coreos/etcd/client"
-	"github.com/coreos/etcd/etcdserver"
-	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	"golang.org/x/net/context"
 	"golang.org/x/time/rate"
-	"google.golang.org/grpc"
 	"google.golang.org/grpc/grpclog"
-	"google.golang.org/grpc/transport"
 )
 
-func init() {
-	grpclog.SetLogger(plog)
-}
-
-type stressFunc func(ctx context.Context) error
-
-type stressEntry struct {
-	weight float32
-	f      stressFunc
-}
-
-type stressTable struct {
-	entries    []stressEntry
-	sumWeights float32
-}
-
-func createStressTable(entries []stressEntry) *stressTable {
-	st := stressTable{entries: entries}
-	for _, entry := range st.entries {
-		st.sumWeights += entry.weight
-	}
-	return &st
-}
-
-func (st *stressTable) choose() stressFunc {
-	v := rand.Float32() * st.sumWeights
-	var sum float32
-	var idx int
-	for i := range st.entries {
-		sum += st.entries[i].weight
-		if sum >= v {
-			idx = i
-			break
-		}
-	}
-	return st.entries[idx].f
-}
-
-func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
-	return func(ctx context.Context) error {
-		_, err := kvc.Put(ctx, &pb.PutRequest{
-			Key:   []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-			Value: randBytes(keySize),
-		}, grpc.FailFast(false))
-		return err
-	}
-}
-
-func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
-		return err
-	}
-}
-
-func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
-		start := rand.Intn(keySuffixRange)
-		end := start + 500
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
-		return err
-	}
-}
-
-func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
-		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
-		return err
-	}
-}
-
-func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
-	return func(ctx context.Context) error {
-		start := rand.Intn(keySuffixRange)
-		end := start + 500
-		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
-		return err
-	}
-}
+func init() { grpclog.SetLogger(plog) }
 
 type Stresser interface {
 	// Stress starts to stress the etcd cluster
@@ -130,305 +32,127 @@ type Stresser interface {
 	Cancel()
 	// Report reports the success and failure of the stress test
 	Report() (success int, failure int)
+	// Checker returns an invariant checker for after the stresser is canceled.
+	Checker() Checker
 }
 
-type stresser struct {
-	Endpoint string
-
-	keyLargeSize   int
-	keySize        int
-	keySuffixRange int
-
-	N int
-
-	mu sync.Mutex
-	wg *sync.WaitGroup
-
-	rateLimiter *rate.Limiter
-
-	cancel func()
-	conn   *grpc.ClientConn
-
-	success int
-	failure int
-
-	stressTable *stressTable
+// nopStresser implements Stresser that does nothing
+type nopStresser struct {
+	start time.Time
+	qps   int
 }
 
-func (s *stresser) Stress() error {
-	if s.rateLimiter == nil {
-		panic("expect rateLimiter to be set")
-	}
-
-	// TODO: add backoff option
-	conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
-	if err != nil {
-		return fmt.Errorf("%v (%s)", err, s.Endpoint)
-	}
-	ctx, cancel := context.WithCancel(context.Background())
-
-	wg := &sync.WaitGroup{}
-	wg.Add(s.N)
-
-	s.mu.Lock()
-	s.conn = conn
-	s.cancel = cancel
-	s.wg = wg
-	s.mu.Unlock()
-
-	kvc := pb.NewKVClient(conn)
-
-	var stressEntries = []stressEntry{
-		{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
-		{
-			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
-			f:      newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
-		},
-		{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
-	}
-	s.stressTable = createStressTable(stressEntries)
-
-	for i := 0; i < s.N; i++ {
-		go s.run(ctx)
-	}
-
-	plog.Printf("stresser %q is started", s.Endpoint)
-	return nil
+func (s *nopStresser) Stress() error { return nil }
+func (s *nopStresser) Cancel()       {}
+func (s *nopStresser) Report() (int, int) {
+	return int(time.Since(s.start).Seconds()) * s.qps, 0
 }
+func (s *nopStresser) Checker() Checker { return nil }
 
-func (s *stresser) run(ctx context.Context) {
-	defer s.wg.Done()
-
-	for {
-		if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
-			return
-		}
-
-		// TODO: 10-second is enough timeout to cover leader failure
-		// and immediate leader election. Find out what other cases this
-		// could be timed out.
-		sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
-
-		err := s.stressTable.choose()(sctx)
-
-		scancel()
-
-		if err != nil {
-			s.mu.Lock()
-			s.failure++
-			s.mu.Unlock()
-
-			switch grpc.ErrorDesc(err) {
-			case context.DeadlineExceeded.Error():
-				// This retries when request is triggered at the same time as
-				// leader failure. When we terminate the leader, the request to
-				// that leader cannot be processed, and times out. Also requests
-				// to followers cannot be forwarded to the old leader, so timing out
-				// as well. We want to keep stressing until the cluster elects a
-				// new leader and start processing requests again.
-				continue
-
-			case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
-				// This retries when request is triggered at the same time as
-				// leader failure and follower nodes receive time out errors
-				// from losing their leader. Followers should retry to connect
-				// to the new leader.
-				continue
-
-			case etcdserver.ErrStopped.Error():
-				// one of the etcd nodes stopped from failure injection
-				continue
-
-			case transport.ErrConnClosing.Desc:
-				// server closed the transport (failure injected node)
-				continue
-
-			case rpctypes.ErrNotCapable.Error():
-				// capability check has not been done (in the beginning)
-				continue
-
-			case rpctypes.ErrTooManyRequests.Error():
-				// hitting the recovering member.
-				continue
-
-			case context.Canceled.Error():
-				// from stresser.Cancel method:
-				return
+// compositeStresser implements a Stresser that runs a slice of
+// stressers concurrently.
+type compositeStresser struct {
+	stressers []Stresser
+}
 
-			case grpc.ErrClientConnClosing.Error():
-				// from stresser.Cancel method:
-				return
+func (cs *compositeStresser) Stress() error {
+	for i, s := range cs.stressers {
+		if err := s.Stress(); err != nil {
+			for j := 0; j < i; j++ {
+				cs.stressers[i].Cancel()
 			}
-
-			su, fa := s.Report()
-			plog.Warningf("stresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
-
-			return
+			return err
 		}
-
-		s.mu.Lock()
-		s.success++
-		s.mu.Unlock()
 	}
+	return nil
 }
 
-func (s *stresser) Cancel() {
-	s.mu.Lock()
-	s.cancel()
-	s.conn.Close()
-	wg := s.wg
-	s.mu.Unlock()
-
+func (cs *compositeStresser) Cancel() {
+	var wg sync.WaitGroup
+	wg.Add(len(cs.stressers))
+	for i := range cs.stressers {
+		go func(s Stresser) {
+			defer wg.Done()
+			s.Cancel()
+		}(cs.stressers[i])
+	}
 	wg.Wait()
-	plog.Printf("stresser %q is canceled", s.Endpoint)
-}
-
-func (s *stresser) Report() (int, int) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	return s.success, s.failure
-}
-
-type stresserV2 struct {
-	Endpoint string
-
-	keySize        int
-	keySuffixRange int
-
-	N int
-
-	mu      sync.Mutex
-	failure int
-	success int
-
-	cancel func()
 }
 
-func (s *stresserV2) Stress() error {
-	cfg := clientV2.Config{
-		Endpoints: []string{s.Endpoint},
-		Transport: &http.Transport{
-			Dial: (&net.Dialer{
-				Timeout:   time.Second,
-				KeepAlive: 30 * time.Second,
-			}).Dial,
-			MaxIdleConnsPerHost: s.N,
-		},
-	}
-	c, err := clientV2.New(cfg)
-	if err != nil {
-		return err
-	}
-
-	kv := clientV2.NewKeysAPI(c)
-	ctx, cancel := context.WithCancel(context.Background())
-	s.cancel = cancel
-
-	for i := 0; i < s.N; i++ {
-		go func() {
-			for {
-				setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
-				key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
-				_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
-				setcancel()
-				if err == context.Canceled {
-					return
-				}
-				s.mu.Lock()
-				if err != nil {
-					s.failure++
-				} else {
-					s.success++
-				}
-				s.mu.Unlock()
-			}
-		}()
+func (cs *compositeStresser) Report() (succ int, fail int) {
+	for _, stress := range cs.stressers {
+		s, f := stress.Report()
+		succ += s
+		fail += f
 	}
-
-	<-ctx.Done()
-	return nil
+	return succ, fail
 }
 
-func (s *stresserV2) Cancel() {
-	s.cancel()
-}
-
-func (s *stresserV2) Report() (success int, failure int) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	return s.success, s.failure
-}
-
-func randBytes(size int) []byte {
-	data := make([]byte, size)
-	for i := 0; i < size; i++ {
-		data[i] = byte(int('a') + rand.Intn(26))
+func (cs *compositeStresser) Checker() Checker {
+	var chks []Checker
+	for _, s := range cs.stressers {
+		if chk := s.Checker(); chk != nil {
+			chks = append(chks, chk)
+		}
 	}
-	return data
-}
-
-// nopStresser implements Stresser that does nothing
-type nopStresser struct {
-	start time.Time
-	qps   int
-}
-
-func (s *nopStresser) Stress() error { return nil }
-func (s *nopStresser) Cancel()       {}
-func (s *nopStresser) Report() (int, int) {
-	return int(time.Since(s.start).Seconds()) * s.qps, 0
+	if len(chks) == 0 {
+		return nil
+	}
+	return newCompositeChecker(chks)
 }
 
 type stressConfig struct {
-	qps            int
 	keyLargeSize   int
 	keySize        int
 	keySuffixRange int
-	v2             bool
-}
 
-type stressBuilder func(m *member) Stresser
+	numLeases    int
+	keysPerLease int
+
+	rateLimiter *rate.Limiter
+}
 
-func newStressBuilder(s string, sc *stressConfig) stressBuilder {
+// NewStresser creates stresser from a comma separated list of stresser types.
+func NewStresser(s string, sc *stressConfig, m *member) Stresser {
+	types := strings.Split(s, ",")
+	if len(types) > 1 {
+		stressers := make([]Stresser, len(types))
+		for i, stype := range types {
+			stressers[i] = NewStresser(stype, sc, m)
+		}
+		return &compositeStresser{stressers}
+	}
 	switch s {
 	case "nop":
-		return func(*member) Stresser {
-			return &nopStresser{
-				start: time.Now(),
-				qps:   sc.qps,
-			}
-		}
-	case "default":
+		return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
+	case "keys":
 		// TODO: Too intensive stressers can panic etcd member with
 		// 'out of memory' error. Put rate limits in server side.
-		stressN := 100
-		l := rate.NewLimiter(rate.Limit(sc.qps), sc.qps)
-
-		return func(m *member) Stresser {
-			if sc.v2 {
-				return &stresserV2{
-					Endpoint:       m.ClientURL,
-					keySize:        sc.keySize,
-					keySuffixRange: sc.keySuffixRange,
-					N:              stressN,
-				}
-			} else {
-				return &stresser{
-					Endpoint:       m.grpcAddr(),
-					keyLargeSize:   sc.keyLargeSize,
-					keySize:        sc.keySize,
-					keySuffixRange: sc.keySuffixRange,
-					N:              stressN,
-					rateLimiter:    l,
-				}
-			}
+		return &keyStresser{
+			Endpoint:       m.grpcAddr(),
+			keyLargeSize:   sc.keyLargeSize,
+			keySize:        sc.keySize,
+			keySuffixRange: sc.keySuffixRange,
+			N:              100,
+			rateLimiter:    sc.rateLimiter,
+		}
+	case "v2keys":
+		return &v2Stresser{
+			Endpoint:       m.ClientURL,
+			keySize:        sc.keySize,
+			keySuffixRange: sc.keySuffixRange,
+			N:              100,
+			rateLimiter:    sc.rateLimiter,
+		}
+	case "lease":
+		return &leaseStresser{
+			endpoint:     m.grpcAddr(),
+			numLeases:    sc.numLeases,
+			keysPerLease: sc.keysPerLease,
+			rateLimiter:  sc.rateLimiter,
 		}
 	default:
 		plog.Panicf("unknown stresser type: %s\n", s)
 	}
-
 	return nil // never reach here
 }

+ 45 - 65
tools/functional-tester/etcd-tester/tester.go

@@ -20,17 +20,19 @@ import (
 )
 
 type tester struct {
+	cluster *cluster
+	limit   int
+
 	failures        []failure
-	cluster         *cluster
-	limit           int
 	status          Status
 	currentRevision int64
 
-	Stressers            []Stresser
-	stressBuilder        stressBuilder
-	leaseStresserBuilder leaseStresserBuilder
-	Checker              Checker
-	consistencyCheck     bool
+	stresserType string
+	scfg         stressConfig
+	doChecks     bool
+
+	stresser Stresser
+	checker  Checker
 }
 
 // compactQPS is rough number of compact requests per second.
@@ -45,7 +47,10 @@ func (tt *tester) runLoop() {
 		tt.status.Failures = append(tt.status.Failures, f.Desc())
 	}
 
-	tt.setupStressers()
+	if err := tt.resetStressCheck(); err != nil {
+		plog.Errorf("%s failed to start stresser (%v)", err)
+		return
+	}
 
 	var prevCompactRev int64
 	for round := 0; round < tt.limit || tt.limit == -1; round++ {
@@ -90,27 +95,6 @@ func (tt *tester) runLoop() {
 	plog.Printf("%s functional-tester is finished", tt.logPrefix())
 }
 
-func (tt *tester) setupStressers() {
-	tt.Stressers = make([]Stresser, 0)
-	leaseStressers := make([]Stresser, 0, 2*len(tt.cluster.Members))
-	for i, m := range tt.cluster.Members {
-		lStresser := tt.leaseStresserBuilder(m)
-		leaseStressers[i] = lStresser
-		tt.Stressers = append(tt.Stressers, tt.stressBuilder(m), lStresser)
-	}
-	for i := range tt.Stressers {
-		go tt.Stressers[i].Stress()
-	}
-	if !tt.consistencyCheck || tt.cluster.v2Only {
-		tt.Checker = newNoChecker()
-		return
-	}
-	tt.Checker = newCompositeChecker([]Checker{
-		newHashChecker(hashAndRevGetter(tt.cluster)),
-		newLeaseChecker(leaseStressers)},
-	)
-}
-
 func (tt *tester) doRound(round int) error {
 	for j, f := range tt.failures {
 		caseTotalCounter.WithLabelValues(f.Desc()).Inc()
@@ -129,26 +113,21 @@ func (tt *tester) doRound(round int) error {
 		if err := f.Recover(tt.cluster, round); err != nil {
 			return fmt.Errorf("recovery error: %v", err)
 		}
+		plog.Printf("%s recovered failure", tt.logPrefix())
+		tt.cancelStresser()
 		plog.Printf("%s wait until cluster is healthy", tt.logPrefix())
 		if err := tt.cluster.WaitHealth(); err != nil {
 			return fmt.Errorf("wait full health error: %v", err)
 		}
-		plog.Printf("%s recovered failure", tt.logPrefix())
-
 		if err := tt.checkConsistency(); err != nil {
 			return fmt.Errorf("tt.checkConsistency error (%v)", err)
 		}
-
 		plog.Printf("%s succeed!", tt.logPrefix())
 	}
 	return nil
 }
 
 func (tt *tester) updateRevision() error {
-	if tt.cluster.v2Only {
-		return nil
-	}
-
 	revs, _, err := tt.cluster.getRevisionHash()
 	for _, rev := range revs {
 		tt.currentRevision = rev
@@ -160,7 +139,6 @@ func (tt *tester) updateRevision() error {
 }
 
 func (tt *tester) checkConsistency() (err error) {
-	tt.cancelStressers()
 	defer func() {
 		if err != nil {
 			return
@@ -169,20 +147,19 @@ func (tt *tester) checkConsistency() (err error) {
 			plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
 			return
 		}
-		err = tt.startStressers()
+		err = tt.startStresser()
 	}()
-	if err = tt.Checker.Check(); err != nil {
+	if err = tt.checker.Check(); err != nil {
 		plog.Printf("%s %v", tt.logPrefix(), err)
 	}
-
 	return err
 }
 
 func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
-	tt.cancelStressers()
+	tt.cancelStresser()
 	defer func() {
 		if err == nil {
-			err = tt.startStressers()
+			err = tt.startStresser()
 		}
 	}()
 
@@ -211,7 +188,6 @@ func (tt *tester) defrag() error {
 		}
 		return err
 	}
-
 	plog.Printf("%s defragmented...", tt.logPrefix())
 	return nil
 }
@@ -236,45 +212,49 @@ func (tt *tester) cleanup() error {
 	}
 	caseFailedTotalCounter.WithLabelValues(desc).Inc()
 
-	tt.cancelStressers()
-	plog.Printf("%s cleaning up...", tt.logPrefix())
+	tt.cancelStresser()
 	if err := tt.cluster.Cleanup(); err != nil {
 		plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
 		return err
 	}
-
 	if err := tt.cluster.Reset(); err != nil {
 		plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
 		return err
 	}
-
-	return nil
+	return tt.resetStressCheck()
 }
 
-func (tt *tester) cancelStressers() {
+func (tt *tester) cancelStresser() {
 	plog.Printf("%s canceling the stressers...", tt.logPrefix())
-	for _, s := range tt.Stressers {
-		s.Cancel()
-	}
+	tt.stresser.Cancel()
 	plog.Printf("%s canceled stressers", tt.logPrefix())
 }
 
-func (tt *tester) startStressers() error {
+func (tt *tester) startStresser() (err error) {
 	plog.Printf("%s starting the stressers...", tt.logPrefix())
-	for _, s := range tt.Stressers {
-		if err := s.Stress(); err != nil {
-			return err
-		}
-	}
+	err = tt.stresser.Stress()
 	plog.Printf("%s started stressers", tt.logPrefix())
-	return nil
+	return err
 }
 
-func (tt *tester) Report() (success, failure int) {
-	for _, stresser := range tt.Stressers {
-		s, f := stresser.Report()
-		success += s
-		failure += f
+func (tt *tester) resetStressCheck() error {
+	plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
+	cs := &compositeStresser{}
+	for _, m := range tt.cluster.Members {
+		s := NewStresser(tt.stresserType, &tt.scfg, m)
+		cs.stressers = append(cs.stressers, s)
 	}
-	return
+	tt.stresser = cs
+	if !tt.doChecks {
+		tt.checker = newNoChecker()
+		return tt.startStresser()
+	}
+	chk := newHashChecker(hashAndRevGetter(tt.cluster))
+	if schk := cs.Checker(); schk != nil {
+		chk = newCompositeChecker([]Checker{chk, schk})
+	}
+	tt.checker = chk
+	return tt.startStresser()
 }
+
+func (tt *tester) Report() (success, failure int) { return tt.stresser.Report() }

+ 16 - 0
tools/functional-tester/etcd-tester/util.go

@@ -14,6 +14,11 @@
 
 package main
 
+import (
+	"fmt"
+	"strings"
+)
+
 func getSameValue(vals map[string]int64) (int64, bool) {
 	var rv int64
 	for _, v := range vals {
@@ -33,3 +38,14 @@ func max(n1, n2 int64) int64 {
 	}
 	return n2
 }
+
+func errsToError(errs []error) error {
+	if len(errs) == 0 {
+		return nil
+	}
+	stringArr := make([]string, len(errs))
+	for i, err := range errs {
+		stringArr[i] = err.Error()
+	}
+	return fmt.Errorf(strings.Join(stringArr, ", "))
+}

+ 120 - 0
tools/functional-tester/etcd-tester/v2_stresser.go

@@ -0,0 +1,120 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+
+	"golang.org/x/time/rate"
+
+	clientV2 "github.com/coreos/etcd/client"
+)
+
+type v2Stresser struct {
+	Endpoint string
+
+	keySize        int
+	keySuffixRange int
+
+	N int
+
+	rateLimiter *rate.Limiter
+
+	wg sync.WaitGroup
+
+	mu      sync.Mutex
+	failure int
+	success int
+
+	cancel func()
+}
+
+func (s *v2Stresser) Stress() error {
+	cfg := clientV2.Config{
+		Endpoints: []string{s.Endpoint},
+		Transport: &http.Transport{
+			Dial: (&net.Dialer{
+				Timeout:   time.Second,
+				KeepAlive: 30 * time.Second,
+			}).Dial,
+			MaxIdleConnsPerHost: s.N,
+		},
+	}
+	c, err := clientV2.New(cfg)
+	if err != nil {
+		return err
+	}
+
+	kv := clientV2.NewKeysAPI(c)
+	ctx, cancel := context.WithCancel(context.Background())
+	s.cancel = cancel
+	s.wg.Add(s.N)
+	for i := 0; i < s.N; i++ {
+		go func() {
+			defer s.wg.Done()
+			s.run(ctx, kv)
+		}()
+	}
+	return nil
+}
+
+func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
+	for {
+		if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
+			return
+		}
+		setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
+		key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
+		_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
+		setcancel()
+		if err == context.Canceled {
+			return
+		}
+		s.mu.Lock()
+		if err != nil {
+			s.failure++
+		} else {
+			s.success++
+		}
+		s.mu.Unlock()
+	}
+}
+
+func (s *v2Stresser) Cancel() {
+	s.cancel()
+	s.wg.Wait()
+}
+
+func (s *v2Stresser) Report() (success int, failure int) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.success, s.failure
+}
+
+func (s *v2Stresser) Checker() Checker { return nil }
+
+func randBytes(size int) []byte {
+	data := make([]byte, size)
+	for i := 0; i < size; i++ {
+		data[i] = byte(int('a') + rand.Intn(26))
+	}
+	return data
+}