Browse Source

functional-tester/tester: use "clientv3" for key stresser

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
00ed41d175
1 changed files with 61 additions and 86 deletions
  1. 61 86
      tools/functional-tester/tester/stress_key.go

+ 61 - 86
tools/functional-tester/tester/stress_key.go

@@ -22,9 +22,9 @@ import (
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
 
 	"go.uber.org/zap"
 	"go.uber.org/zap"
@@ -51,7 +51,7 @@ type keyStresser struct {
 	wg sync.WaitGroup
 	wg sync.WaitGroup
 
 
 	cancel func()
 	cancel func()
-	conn   *grpc.ClientConn
+	cli    *clientv3.Client
 	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
 	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
 	atomicModifiedKeys int64
 	atomicModifiedKeys int64
 
 
@@ -60,35 +60,33 @@ type keyStresser struct {
 
 
 func (s *keyStresser) Stress() error {
 func (s *keyStresser) Stress() error {
 	// TODO: add backoff option
 	// TODO: add backoff option
-	conn, err := s.m.DialEtcdGRPCServer()
+	cli, err := s.m.CreateEtcdClient()
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
 		return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
 	}
 	}
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 
 
 	s.wg.Add(s.N)
 	s.wg.Add(s.N)
-	s.conn = conn
+	s.cli = cli
 	s.cancel = cancel
 	s.cancel = cancel
 
 
-	kvc := pb.NewKVClient(conn)
-
 	var stressEntries = []stressEntry{
 	var stressEntries = []stressEntry{
-		{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
+		{weight: 0.7, f: newStressPut(cli, s.keySuffixRange, s.keySize)},
 		{
 		{
 			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
 			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
-			f:      newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
+			f:      newStressPut(cli, s.keySuffixRange, s.keyLargeSize),
 		},
 		},
-		{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
+		{weight: 0.07, f: newStressRange(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressRangeInterval(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDelete(cli, s.keySuffixRange)},
+		{weight: 0.07, f: newStressDeleteInterval(cli, s.keySuffixRange)},
 	}
 	}
 	if s.keyTxnSuffixRange > 0 {
 	if s.keyTxnSuffixRange > 0 {
 		// adjust to make up ±70% of workloads with writes
 		// adjust to make up ±70% of workloads with writes
 		stressEntries[0].weight = 0.35
 		stressEntries[0].weight = 0.35
 		stressEntries = append(stressEntries, stressEntry{
 		stressEntries = append(stressEntries, stressEntry{
 			weight: 0.35,
 			weight: 0.35,
-			f:      newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
+			f:      newStressTxn(cli, s.keyTxnSuffixRange, s.keyTxnOps),
 		})
 		})
 	}
 	}
 	s.stressTable = createStressTable(stressEntries)
 	s.stressTable = createStressTable(stressEntries)
@@ -167,7 +165,7 @@ func (s *keyStresser) Pause() {
 
 
 func (s *keyStresser) Close() {
 func (s *keyStresser) Close() {
 	s.cancel()
 	s.cancel()
-	s.conn.Close()
+	s.cli.Close()
 	s.wg.Wait()
 	s.wg.Wait()
 
 
 	s.lg.Info(
 	s.lg.Info(
@@ -216,25 +214,26 @@ func (st *stressTable) choose() stressFunc {
 	return st.entries[idx].f
 	return st.entries[idx].f
 }
 }
 
 
-func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
+func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.Put(ctx, &pb.PutRequest{
-			Key:   []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-			Value: randBytes(keySize),
-		}, grpc.FailFast(false))
+		_, err := cli.Put(
+			ctx,
+			fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)),
+			string(randBytes(keySize)),
+		)
 		return err, 1
 		return err, 1
 	}
 	}
 }
 }
 
 
-func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
+func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc {
 	keys := make([]string, keyTxnSuffixRange)
 	keys := make([]string, keyTxnSuffixRange)
 	for i := range keys {
 	for i := range keys {
 		keys[i] = fmt.Sprintf("/k%03d", i)
 		keys[i] = fmt.Sprintf("/k%03d", i)
 	}
 	}
-	return writeTxn(kvc, keys, txnOps)
+	return writeTxn(cli, keys, txnOps)
 }
 }
 
 
-func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
+func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
 		ks := make(map[string]struct{}, txnOps)
 		ks := make(map[string]struct{}, txnOps)
 		for len(ks) != txnOps {
 		for len(ks) != txnOps {
@@ -244,99 +243,75 @@ func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
 		for k := range ks {
 		for k := range ks {
 			selected = append(selected, k)
 			selected = append(selected, k)
 		}
 		}
-		com, delOp, putOp := getTxnReqs(selected[0], "bar00")
-		txnReq := &pb.TxnRequest{
-			Compare: []*pb.Compare{com},
-			Success: []*pb.RequestOp{delOp},
-			Failure: []*pb.RequestOp{putOp},
-		}
-
-		// add nested txns if any
-		for i := 1; i < txnOps; i++ {
+		com, delOp, putOp := getTxnOps(selected[0], "bar00")
+		thenOps := []clientv3.Op{delOp}
+		elseOps := []clientv3.Op{putOp}
+		for i := 1; i < txnOps; i++ { // nested txns
 			k, v := selected[i], fmt.Sprintf("bar%02d", i)
 			k, v := selected[i], fmt.Sprintf("bar%02d", i)
-			com, delOp, putOp = getTxnReqs(k, v)
-			nested := &pb.RequestOp{
-				Request: &pb.RequestOp_RequestTxn{
-					RequestTxn: &pb.TxnRequest{
-						Compare: []*pb.Compare{com},
-						Success: []*pb.RequestOp{delOp},
-						Failure: []*pb.RequestOp{putOp},
-					},
-				},
-			}
-			txnReq.Success = append(txnReq.Success, nested)
-			txnReq.Failure = append(txnReq.Failure, nested)
+			com, delOp, putOp = getTxnOps(k, v)
+			txnOp := clientv3.OpTxn(
+				[]clientv3.Cmp{com},
+				[]clientv3.Op{delOp},
+				[]clientv3.Op{putOp},
+			)
+			thenOps = append(thenOps, txnOp)
+			elseOps = append(elseOps, txnOp)
 		}
 		}
-
-		_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
+		_, err := cli.Txn(ctx).
+			If(com).
+			Else(elseOps...).
+			Then(thenOps...).
+			Commit()
 		return err, int64(txnOps)
 		return err, int64(txnOps)
 	}
 	}
 }
 }
 
 
-func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
+func getTxnOps(k, v string) (
+	cmp clientv3.Cmp,
+	dop clientv3.Op,
+	pop clientv3.Op) {
 	// if key exists (version > 0)
 	// if key exists (version > 0)
-	com = &pb.Compare{
-		Key:         []byte(key),
-		Target:      pb.Compare_VERSION,
-		Result:      pb.Compare_GREATER,
-		TargetUnion: &pb.Compare_Version{Version: 0},
-	}
-	delOp = &pb.RequestOp{
-		Request: &pb.RequestOp_RequestDeleteRange{
-			RequestDeleteRange: &pb.DeleteRangeRequest{
-				Key: []byte(key),
-			},
-		},
-	}
-	putOp = &pb.RequestOp{
-		Request: &pb.RequestOp_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   []byte(key),
-				Value: []byte(val),
-			},
-		},
-	}
-	return com, delOp, putOp
+	cmp = clientv3.Compare(clientv3.Version(k), ">", 0)
+	dop = clientv3.OpDelete(k)
+	pop = clientv3.OpPut(k, v)
+	return cmp, dop, pop
 }
 }
 
 
-func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
+		_, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
 		return err, 0
 		return err, 0
 	}
 	}
 }
 }
 
 
-func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
 		end := start + 500
-		_, err := kvc.Range(ctx, &pb.RangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
+		_, err := cli.Get(
+			ctx,
+			fmt.Sprintf("foo%016x", start),
+			clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
+		)
 		return err, 0
 		return err, 0
 	}
 	}
 }
 }
 
 
-func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
-		_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
-		}, grpc.FailFast(false))
+		_, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
 		return err, 1
 		return err, 1
 	}
 	}
 }
 }
 
 
-func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
+func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 	return func(ctx context.Context) (error, int64) {
 		start := rand.Intn(keySuffixRange)
 		start := rand.Intn(keySuffixRange)
 		end := start + 500
 		end := start + 500
-		resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
-			Key:      []byte(fmt.Sprintf("foo%016x", start)),
-			RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
-		}, grpc.FailFast(false))
+		resp, err := cli.Delete(ctx,
+			fmt.Sprintf("foo%016x", start),
+			clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
+		)
 		if err == nil {
 		if err == nil {
 			return nil, resp.Deleted
 			return nil, resp.Deleted
 		}
 		}