Browse Source

Merge pull request #9525 from gyuho/ft

functional-tester: use "clientv3" for stressers
Gyuho Lee 7 years ago
parent
commit
d7cf2cc03f

+ 22 - 5
tools/functional-tester/rpcpb/member.go

@@ -41,15 +41,17 @@ func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn,
 }
 
 // CreateEtcdClient creates a client from member.
-func (m *Member) CreateEtcdClient() (*clientv3.Client, error) {
+func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
+	cfg := clientv3.Config{
+		Endpoints:   []string{m.EtcdClientEndpoint},
+		DialTimeout: 5 * time.Second,
+		DialOptions: opts,
+	}
 	if m.EtcdClientTLS {
 		// TODO: support TLS
 		panic("client TLS not supported yet")
 	}
-	return clientv3.New(clientv3.Config{
-		Endpoints:   []string{m.EtcdClientEndpoint},
-		DialTimeout: 5 * time.Second,
-	})
+	return clientv3.New(cfg)
 }
 
 // CheckCompact ensures that historical data before given revision has been compacted.
@@ -124,6 +126,21 @@ func (m *Member) Rev(ctx context.Context) (int64, error) {
 	return resp.Header.Revision, nil
 }
 
+// Compact compacts member storage with given revision.
+// It blocks until it's physically done.
+func (m *Member) Compact(rev int64, timeout time.Duration) error {
+	cli, err := m.CreateEtcdClient()
+	if err != nil {
+		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
+	}
+	defer cli.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	_, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
+	cancel()
+	return err
+}
+
 // IsLeader returns true if this member is the current cluster leader.
 func (m *Member) IsLeader() (bool, error) {
 	cli, err := m.CreateEtcdClient()

+ 14 - 16
tools/functional-tester/tester/checks.go

@@ -21,7 +21,6 @@ import (
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
 	"go.uber.org/zap"
@@ -101,22 +100,20 @@ type leaseChecker struct {
 	lg  *zap.Logger
 	m   *rpcpb.Member
 	ls  *leaseStresser
-	lsc pb.LeaseClient
-	kvc pb.KVClient
+	cli *clientv3.Client
 }
 
 func (lc *leaseChecker) Check() error {
-	conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second))
+	cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
 	if err != nil {
 		return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
 	}
 	defer func() {
-		if conn != nil {
-			conn.Close()
+		if cli != nil {
+			cli.Close()
 		}
 	}()
-	lc.kvc = pb.NewKVClient(conn)
-	lc.lsc = pb.NewLeaseClient(conn)
+	lc.cli = cli
 	if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
 		return err
 	}
@@ -148,7 +145,7 @@ func (lc *leaseChecker) checkShortLivedLeases() error {
 
 func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
 	// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
-	var resp *pb.LeaseTimeToLiveResponse
+	var resp *clientv3.LeaseTimeToLiveResponse
 	for i := 0; i < retries; i++ {
 		resp, err = lc.getLeaseByID(ctx, leaseID)
 		// lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
@@ -230,9 +227,13 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
 	return nil
 }
 
-func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
-	ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
-	return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
+// TODO: handle failures from "grpc.FailFast(false)"
+func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
+	return lc.cli.TimeToLive(
+		ctx,
+		clientv3.LeaseID(leaseID),
+		clientv3.WithAttachedKeys(),
+	)
 }
 
 func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
@@ -261,10 +262,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
 // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
 // determines whether the attached keys for a given leaseID has been deleted or not
 func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
-	resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{
-		Key:      []byte(fmt.Sprintf("%d", leaseID)),
-		RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
-	}, grpc.FailFast(false))
+	resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
 	if err != nil {
 		lc.lg.Warn(
 			"hasKeysAttachedToLeaseExpired failed",

+ 1 - 19
tools/functional-tester/tester/cluster.go

@@ -25,7 +25,6 @@ import (
 	"strings"
 	"time"
 
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/debugutil"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
@@ -681,31 +680,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 	}
 
 	for i, m := range clus.Members {
-		conn, derr := m.DialEtcdGRPCServer()
-		if derr != nil {
-			clus.lg.Warn(
-				"compactKV dial failed",
-				zap.String("endpoint", m.EtcdClientEndpoint),
-				zap.Error(derr),
-			)
-			err = derr
-			continue
-		}
-		kvc := pb.NewKVClient(conn)
-
 		clus.lg.Info(
 			"compacting",
 			zap.String("endpoint", m.EtcdClientEndpoint),
 			zap.Int64("compact-revision", rev),
 			zap.Duration("timeout", timeout),
 		)
-
 		now := time.Now()
-		ctx, cancel := context.WithTimeout(context.Background(), timeout)
-		_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
-		cancel()
-
-		conn.Close()
+		cerr := m.Compact(rev, timeout)
 		succeed := true
 		if cerr != nil {
 			if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {

+ 61 - 86
tools/functional-tester/tester/stress_key.go

@@ -22,9 +22,9 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
 	"go.uber.org/zap"
@@ -51,7 +51,7 @@ type keyStresser struct {
 	wg sync.WaitGroup
 
 	cancel func()
-	conn   *grpc.ClientConn
+	cli    *clientv3.Client
 	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
 	atomicModifiedKeys int64
 
@@ -60,35 +60,33 @@ type keyStresser struct {
 
 func (s *keyStresser) Stress() error {
 	// TODO: add backoff option
-	conn, err := s.m.DialEtcdGRPCServer()
+	cli, err := s.m.CreateEtcdClient()
 	if err != nil {
 		return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
 	}
 	ctx, cancel := context.WithCancel(context.Background())
 
 	s.wg.Add(s.N)
-	s.conn = conn
+	s.cli = cli
 	s.cancel = cancel
 
-	kvc := pb.NewKVClient(conn)
-
 	var stressEntries = []stressEntry{
-		{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
+		{weight: 0.7, f: newStressPut(cli, s.keySuffixRange, s.keySize)},
 		{
 			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
-			f:      newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
+			f:      newStressPut(cli, s.keySuffixRange, s.keyLargeSize),
 		},
-		{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
+		{weight: 0.07, f: newStressRange(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressRangeInterval(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDelete(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDeleteInterval(cli, s.keySuffixRange)},
 	}
 	if s.keyTxnSuffixRange > 0 {
 		// adjust to make up ±70% of workloads with writes
 		stressEntries[0].weight = 0.35
 		stressEntries = append(stressEntries, stressEntry{
 			weight: 0.35,
-			f:      newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
+			f:      newStressTxn(cli, s.keyTxnSuffixRange, s.keyTxnOps),
 		})
 	}
 	s.stressTable = createStressTable(stressEntries)
@@ -167,7 +165,7 @@ func (s *keyStresser) Pause() {
 
 func (s *keyStresser) Close() {
 	s.cancel()
-	s.conn.Close()
+	s.cli.Close()
 	s.wg.Wait()
 
 	s.lg.Info(
@@ -216,25 +214,26 @@ func (st *stressTable) choose() stressFunc {
 	return st.entries[idx].f
 }
 
-func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
+func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.Put(ctx, &pb.PutRequest{
-			Key:   []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-			Value: randBytes(keySize),
-		}, grpc.FailFast(false))
+		_, err := cli.Put(
+			ctx,
+			fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)),
+			string(randBytes(keySize)),
+		)
 		return err, 1
 	}
 }
 
-func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
+func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc {
 	keys := make([]string, keyTxnSuffixRange)
 	for i := range keys {
 		keys[i] = fmt.Sprintf("/k%03d", i)
 	}
-	return writeTxn(kvc, keys, txnOps)
+	return writeTxn(cli, keys, txnOps)
 }
 
-func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
+func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 		ks := make(map[string]struct{}, txnOps)
 		for len(ks) != txnOps {
@@ -244,99 +243,75 @@ func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
 		for k := range ks {
 			selected = append(selected, k)
 		}
-		com, delOp, putOp := getTxnReqs(selected[0], "bar00")
-		txnReq := &pb.TxnRequest{
-			Compare: []*pb.Compare{com},
-			Success: []*pb.RequestOp{delOp},
-			Failure: []*pb.RequestOp{putOp},
-		}
-
-		// add nested txns if any
-		for i := 1; i < txnOps; i++ {
+		com, delOp, putOp := getTxnOps(selected[0], "bar00")
+		thenOps := []clientv3.Op{delOp}
+		elseOps := []clientv3.Op{putOp}
+		for i := 1; i < txnOps; i++ { // nested txns
 			k, v := selected[i], fmt.Sprintf("bar%02d", i)
-			com, delOp, putOp = getTxnReqs(k, v)
-			nested := &pb.RequestOp{
-				Request: &pb.RequestOp_RequestTxn{
-					RequestTxn: &pb.TxnRequest{
-						Compare: []*pb.Compare{com},
-						Success: []*pb.RequestOp{delOp},
-						Failure: []*pb.RequestOp{putOp},
-					},
-				},
-			}
-			txnReq.Success = append(txnReq.Success, nested)
-			txnReq.Failure = append(txnReq.Failure, nested)
+			com, delOp, putOp = getTxnOps(k, v)
+			txnOp := clientv3.OpTxn(
+				[]clientv3.Cmp{com},
+				[]clientv3.Op{delOp},
+				[]clientv3.Op{putOp},
+			)
+			thenOps = append(thenOps, txnOp)
+			elseOps = append(elseOps, txnOp)
 		}
-
-		_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
+		_, err := cli.Txn(ctx).
+			If(com).
+			Then(thenOps...).
+			Else(elseOps...).
+			Commit()
 		return err, int64(txnOps)
 	}
 }
 
-func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
+func getTxnOps(k, v string) (
+	cmp clientv3.Cmp,
+	dop clientv3.Op,
+	pop clientv3.Op) {
 	// if key exists (version > 0)
-	com = &pb.Compare{
-		Key:         []byte(key),
-		Target:      pb.Compare_VERSION,
-		Result:      pb.Compare_GREATER,
-		TargetUnion: &pb.Compare_Version{Version: 0},
-	}
-	delOp = &pb.RequestOp{
-		Request: &pb.RequestOp_RequestDeleteRange{
-			RequestDeleteRange: &pb.DeleteRangeRequest{
-				Key: []byte(key),
-			},
-		},
-	}
-	putOp = &pb.RequestOp{
-		Request: &pb.RequestOp_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   []byte(key),
-				Value: []byte(val),
-			},
-		},
-	}
-	return com, delOp, putOp
+	cmp = clientv3.Compare(clientv3.Version(k), ">", 0)
+	dop = clientv3.OpDelete(k)
+	pop = clientv3.OpPut(k, v)
+	return cmp, dop, pop
 }
 
-func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
+		_, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
 		return err, 0
 	}
 }
 
-func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
+		_, err := cli.Get(
+			ctx,
+			fmt.Sprintf("foo%016x", start),
+			clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
+		)
 		return err, 0
 	}
 }
 
-func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
+		_, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
 		return err, 1
 	}
 }
 
-func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
-		resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
+		resp, err := cli.Delete(ctx,
+			fmt.Sprintf("foo%016x", start),
+			clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
+		)
 		if err == nil {
 			return nil, resp.Deleted
 		}

+ 35 - 40
tools/functional-tester/tester/stress_lease.go

@@ -22,8 +22,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
 	"go.uber.org/zap"
@@ -41,11 +41,9 @@ type leaseStresser struct {
 	lg *zap.Logger
 
 	m      *rpcpb.Member
-	cancel func()
-	conn   *grpc.ClientConn
-	kvc    pb.KVClient
-	lc     pb.LeaseClient
+	cli    *clientv3.Client
 	ctx    context.Context
+	cancel func()
 
 	rateLimiter *rate.Limiter
 	// atomicModifiedKey records the number of keys created and deleted during a test case
@@ -118,7 +116,6 @@ func (ls *leaseStresser) setupOnce() error {
 	}
 
 	ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)}
-
 	return nil
 }
 
@@ -132,20 +129,19 @@ func (ls *leaseStresser) Stress() error {
 		return err
 	}
 
-	conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second))
+	ctx, cancel := context.WithCancel(context.Background())
+	ls.ctx = ctx
+	ls.cancel = cancel
+
+	cli, err := ls.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
 	if err != nil {
 		return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
 	}
-	ls.conn = conn
-	ls.kvc = pb.NewKVClient(conn)
-	ls.lc = pb.NewLeaseClient(conn)
+	ls.cli = cli
+
 	ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
 	ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)}
 
-	ctx, cancel := context.WithCancel(context.Background())
-	ls.cancel = cancel
-	ls.ctx = ctx
-
 	ls.runWg.Add(1)
 	go ls.run()
 	return nil
@@ -299,17 +295,17 @@ func (ls *leaseStresser) randomlyDropLeases() {
 }
 
 func (ls *leaseStresser) createLease(ttl int64) (int64, error) {
-	resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl})
+	resp, err := ls.cli.Grant(ls.ctx, ttl)
 	if err != nil {
 		return -1, err
 	}
-	return resp.ID, nil
+	return int64(resp.ID), nil
 }
 
 func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
 	defer ls.aliveWg.Done()
 	ctx, cancel := context.WithCancel(ls.ctx)
-	stream, err := ls.lc.LeaseKeepAlive(ctx)
+	stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
 	defer func() { cancel() }()
 	for {
 		select {
@@ -347,42 +343,36 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
 			)
 			cancel()
 			ctx, cancel = context.WithCancel(ls.ctx)
-			stream, err = ls.lc.LeaseKeepAlive(ctx)
+			stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
 			cancel()
 			continue
 		}
-
-		ls.lg.Debug(
-			"keepLeaseAlive stream sends lease keepalive request",
-			zap.String("endpoint", ls.m.EtcdClientEndpoint),
-			zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
-		)
-		err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
 		if err != nil {
 			ls.lg.Debug(
-				"keepLeaseAlive stream failed to send lease keepalive request",
+				"keepLeaseAlive failed to receive lease keepalive response",
 				zap.String("endpoint", ls.m.EtcdClientEndpoint),
 				zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
 				zap.Error(err),
 			)
 			continue
 		}
-		leaseRenewTime := time.Now()
+
 		ls.lg.Debug(
-			"keepLeaseAlive stream sent lease keepalive request",
+			"keepLeaseAlive waiting on lease stream",
 			zap.String("endpoint", ls.m.EtcdClientEndpoint),
 			zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
 		)
-		respRC, err := stream.Recv()
-		if err != nil {
+		leaseRenewTime := time.Now()
+		respRC := <-stream
+		if respRC == nil {
 			ls.lg.Debug(
-				"keepLeaseAlive stream failed to receive lease keepalive response",
+				"keepLeaseAlive received nil lease keepalive response",
 				zap.String("endpoint", ls.m.EtcdClientEndpoint),
 				zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
-				zap.Error(err),
 			)
 			continue
 		}
+
 		// lease expires after TTL become 0
 		// don't send keepalive if the lease has expired
 		if respRC.TTL <= 0 {
@@ -409,16 +399,18 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
 // the format of key is the concat of leaseID + '_' + '<order of key creation>'
 // e.g 5186835655248304152_0 for first created key and 5186835655248304152_1 for second created key
 func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
-	var txnPuts []*pb.RequestOp
+	var txnPuts []clientv3.Op
 	for j := 0; j < ls.keysPerLease; j++ {
-		txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(fmt.Sprintf("%d%s%d", leaseID, "_", j)),
-			Value: []byte(fmt.Sprintf("bar")), Lease: leaseID}}}
+		txnput := clientv3.OpPut(
+			fmt.Sprintf("%d%s%d", leaseID, "_", j),
+			fmt.Sprintf("bar"),
+			clientv3.WithLease(clientv3.LeaseID(leaseID)),
+		)
 		txnPuts = append(txnPuts, txnput)
 	}
 	// keep retrying until lease is not found or ctx is being canceled
 	for ls.ctx.Err() == nil {
-		txn := &pb.TxnRequest{Success: txnPuts}
-		_, err := ls.kvc.Txn(ls.ctx, txn)
+		_, err := ls.cli.Txn(ls.ctx).Then(txnPuts...).Commit()
 		if err == nil {
 			// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
 			atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
@@ -437,9 +429,10 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
 	if rand.Intn(2) != 0 {
 		return false, nil
 	}
+
 	// keep retrying until a lease is dropped or ctx is being canceled
 	for ls.ctx.Err() == nil {
-		_, err := ls.lc.LeaseRevoke(ls.ctx, &pb.LeaseRevokeRequest{ID: leaseID})
+		_, err := ls.cli.Revoke(ls.ctx, clientv3.LeaseID(leaseID))
 		if err == nil || rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
 			return true, nil
 		}
@@ -454,7 +447,9 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
 	return false, ls.ctx.Err()
 }
 
-func (ls *leaseStresser) Pause() { ls.Close() }
+func (ls *leaseStresser) Pause() {
+	ls.Close()
+}
 
 func (ls *leaseStresser) Close() {
 	ls.lg.Info(
@@ -464,7 +459,7 @@ func (ls *leaseStresser) Close() {
 	ls.cancel()
 	ls.runWg.Wait()
 	ls.aliveWg.Wait()
-	ls.conn.Close()
+	ls.cli.Close()
 	ls.lg.Info(
 		"lease stresser is closed",
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),