Browse Source

Merge pull request #8510 from gyuho/txn-stresser

etcd-tester: add txn stresser
Gyuho Lee 8 years ago
parent
commit
52f73c5a6c

+ 3 - 0
test

@@ -133,6 +133,9 @@ function functional_pass {
 		-peer-ports 12380,22380,32380 \
 		-limit 1 \
 		-schedule-cases "0 1 2 3 4 5" \
+		-stress-qps 1000 \
+		-stress-key-txn-count 100 \
+		-stress-key-txn-ops 10 \
 		-exit-on-failure && echo "'etcd-tester' succeeded"
 	ETCD_TESTER_EXIT_CODE=$?
 	echo "ETCD_TESTER_EXIT_CODE:" ${ETCD_TESTER_EXIT_CODE}

+ 87 - 3
tools/functional-tester/etcd-tester/key_stresser.go

@@ -34,9 +34,11 @@ import (
 type keyStresser struct {
 	Endpoint string
 
-	keyLargeSize   int
-	keySize        int
-	keySuffixRange int
+	keyLargeSize      int
+	keySize           int
+	keySuffixRange    int
+	keyTxnSuffixRange int
+	keyTxnOps         int
 
 	N int
 
@@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error {
 		{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
 		{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
 	}
+	if s.keyTxnSuffixRange > 0 {
+		// adjust to make up ±70% of workloads with writes
+		stressEntries[0].weight = 0.24
+		stressEntries[1].weight = 0.24
+		stressEntries = append(stressEntries, stressEntry{
+			weight: 0.24,
+			f:      newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
+		})
+	}
 	s.stressTable = createStressTable(stressEntries)
 
 	for i := 0; i < s.N; i++ {
@@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
 	}
 }
 
+func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
+	keys := make([]string, keyTxnSuffixRange)
+	for i := range keys {
+		keys[i] = fmt.Sprintf("/k%03d", i)
+	}
+	return writeTxn(kvc, keys, txnOps)
+}
+
+func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
+	return func(ctx context.Context) (error, int64) {
+		ks := make(map[string]struct{}, txnOps)
+		for len(ks) != txnOps {
+			ks[keys[rand.Intn(64)]] = struct{}{}
+		}
+		selected := make([]string, 0, txnOps)
+		for k := range ks {
+			selected = append(selected, k)
+		}
+		com, delOp, putOp := getTxnReqs(selected[0], "bar00")
+		txnReq := &pb.TxnRequest{
+			Compare: []*pb.Compare{com},
+			Success: []*pb.RequestOp{delOp},
+			Failure: []*pb.RequestOp{putOp},
+		}
+
+		// add nested txns if any
+		for i := 1; i < txnOps; i++ {
+			k, v := selected[i], fmt.Sprintf("bar%02d", i)
+			com, delOp, putOp = getTxnReqs(k, v)
+			nested := &pb.RequestOp{
+				Request: &pb.RequestOp_RequestTxn{
+					RequestTxn: &pb.TxnRequest{
+						Compare: []*pb.Compare{com},
+						Success: []*pb.RequestOp{delOp},
+						Failure: []*pb.RequestOp{putOp},
+					},
+				},
+			}
+			txnReq.Success = append(txnReq.Success, nested)
+			txnReq.Failure = append(txnReq.Failure, nested)
+		}
+
+		_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
+		return err, int64(txnOps)
+	}
+}
+
+func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
+	// if key exists (version > 0)
+	com = &pb.Compare{
+		Key:         []byte(key),
+		Target:      pb.Compare_VERSION,
+		Result:      pb.Compare_GREATER,
+		TargetUnion: &pb.Compare_Version{Version: 0},
+	}
+	delOp = &pb.RequestOp{
+		Request: &pb.RequestOp_RequestDeleteRange{
+			RequestDeleteRange: &pb.DeleteRangeRequest{
+				Key: []byte(key),
+			},
+		},
+	}
+	putOp = &pb.RequestOp{
+		Request: &pb.RequestOp_RequestPut{
+			RequestPut: &pb.PutRequest{
+				Key:   []byte(key),
+				Value: []byte(val),
+			},
+		},
+	}
+	return com, delOp, putOp
+}
+
 func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
 	return func(ctx context.Context) (error, int64) {
 		_, err := kvc.Range(ctx, &pb.RangeRequest{

+ 16 - 6
tools/functional-tester/etcd-tester/main.go

@@ -47,6 +47,8 @@ func main() {
 	stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
 	stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
 	stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
+	stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
+	stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
 	limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
 	exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
 	stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
@@ -120,15 +122,23 @@ func main() {
 	}
 
 	scfg := stressConfig{
-		rateLimiter:    rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
-		keyLargeSize:   int(*stressKeyLargeSize),
-		keySize:        int(*stressKeySize),
-		keySuffixRange: int(*stressKeySuffixRange),
-		numLeases:      10,
-		keysPerLease:   10,
+		rateLimiter:       rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
+		keyLargeSize:      int(*stressKeyLargeSize),
+		keySize:           int(*stressKeySize),
+		keySuffixRange:    int(*stressKeySuffixRange),
+		keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
+		keyTxnOps:         int(*stressKeyTxnOps),
+		numLeases:         10,
+		keysPerLease:      10,
 
 		etcdRunnerPath: *etcdRunnerPath,
 	}
+	if scfg.keyTxnSuffixRange > 100 {
+		plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
+	}
+	if scfg.keyTxnOps > 64 {
+		plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
+	}
 
 	t := &tester{
 		failures:      schedule,

+ 13 - 9
tools/functional-tester/etcd-tester/stresser.go

@@ -113,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
 }
 
 type stressConfig struct {
-	keyLargeSize   int
-	keySize        int
-	keySuffixRange int
+	keyLargeSize      int
+	keySize           int
+	keySuffixRange    int
+	keyTxnSuffixRange int
+	keyTxnOps         int
 
 	numLeases    int
 	keysPerLease int
@@ -142,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
 		// TODO: Too intensive stressers can panic etcd member with
 		// 'out of memory' error. Put rate limits in server side.
 		return &keyStresser{
-			Endpoint:       m.grpcAddr(),
-			keyLargeSize:   sc.keyLargeSize,
-			keySize:        sc.keySize,
-			keySuffixRange: sc.keySuffixRange,
-			N:              100,
-			rateLimiter:    sc.rateLimiter,
+			Endpoint:          m.grpcAddr(),
+			keyLargeSize:      sc.keyLargeSize,
+			keySize:           sc.keySize,
+			keySuffixRange:    sc.keySuffixRange,
+			keyTxnSuffixRange: sc.keyTxnSuffixRange,
+			keyTxnOps:         sc.keyTxnOps,
+			N:                 100,
+			rateLimiter:       sc.rateLimiter,
 		}
 	case "v2keys":
 		return &v2Stresser{