Browse Source

functional/tester: configurable stresser weights

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
b1832d2f3c

+ 35 - 34
functional/tester/cluster_read_config.go

@@ -52,6 +52,41 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 		}
 	}
 
+	if len(clus.Tester.Cases) == 0 {
+		return nil, errors.New("Cases not found")
+	}
+	if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
+		return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
+	}
+	if clus.Tester.UpdatedDelayLatencyMs == 0 {
+		clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
+	}
+
+	for _, v := range clus.Tester.Cases {
+		if _, ok := rpcpb.Case_value[v]; !ok {
+			return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
+		}
+	}
+
+	for _, s := range clus.Tester.Stressers {
+		if _, ok := rpcpb.StresserType_value[s.Type]; !ok {
+			return nil, fmt.Errorf("unknown 'StresserType' %+v", s)
+		}
+	}
+
+	for _, v := range clus.Tester.Checkers {
+		if _, ok := rpcpb.Checker_value[v]; !ok {
+			return nil, fmt.Errorf("Checker is unknown; got %q", v)
+		}
+	}
+
+	if clus.Tester.StressKeySuffixRangeTxn > 100 {
+		return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
+	}
+	if clus.Tester.StressKeyTxnOps > 64 {
+		return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
+	}
+
 	for i, mem := range clus.Members {
 		if mem.EtcdExec == "embed" && failpointsEnabled {
 			return nil, errors.New("EtcdExec 'embed' cannot be run with failpoints enabled")
@@ -337,39 +372,5 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 		}
 	}
 
-	if len(clus.Tester.Cases) == 0 {
-		return nil, errors.New("Cases not found")
-	}
-	if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
-		return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
-	}
-	if clus.Tester.UpdatedDelayLatencyMs == 0 {
-		clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
-	}
-
-	for _, v := range clus.Tester.Cases {
-		if _, ok := rpcpb.Case_value[v]; !ok {
-			return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
-		}
-	}
-
-	for _, v := range clus.Tester.Stressers {
-		if _, ok := rpcpb.Stresser_value[v]; !ok {
-			return nil, fmt.Errorf("Stresser is unknown; got %q", v)
-		}
-	}
-	for _, v := range clus.Tester.Checkers {
-		if _, ok := rpcpb.Checker_value[v]; !ok {
-			return nil, fmt.Errorf("Checker is unknown; got %q", v)
-		}
-	}
-
-	if clus.Tester.StressKeySuffixRangeTxn > 100 {
-		return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
-	}
-	if clus.Tester.StressKeyTxnOps > 64 {
-		return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
-	}
-
 	return clus, err
 }

+ 13 - 4
functional/tester/cluster_test.go

@@ -232,10 +232,19 @@ func Test_read(t *testing.T) {
 				"NO_FAIL_WITH_STRESS",
 				"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
 			},
-			FailpointCommands:       []string{`panic("etcd-tester")`},
-			RunnerExecPath:          "./bin/etcd-runner",
-			ExternalExecPath:        "",
-			Stressers:               []string{"KV", "LEASE"},
+			FailpointCommands: []string{`panic("etcd-tester")`},
+			RunnerExecPath:    "./bin/etcd-runner",
+			ExternalExecPath:  "",
+			Stressers: []*rpcpb.Stresser{
+				{Type: "KV_WRITE_SMALL", Weight: 0.35},
+				{Type: "KV_WRITE_LARGE", Weight: 0.002},
+				{Type: "KV_READ_ONE_KEY", Weight: 0.07},
+				{Type: "KV_READ_RANGE", Weight: 0.07},
+				{Type: "KV_DELETE_ONE_KEY", Weight: 0.07},
+				{Type: "KV_DELETE_RANGE", Weight: 0.07},
+				{Type: "KV_TXN_WRITE_DELETE", Weight: 0.35},
+				{Type: "LEASE", Weight: 0.0},
+			},
 			Checkers:                []string{"KV_HASH", "LEASE_EXPIRE"},
 			StressKeySize:           100,
 			StressKeySizeLarge:      32769,

+ 59 - 35
functional/tester/stresser.go

@@ -37,40 +37,60 @@ type Stresser interface {
 
 // newStresser creates stresser from a comma separated list of stresser types.
 func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
-	stressers = make([]Stresser, len(clus.Tester.Stressers))
-	for i, stype := range clus.Tester.Stressers {
+	// TODO: Too intensive stressing clients can panic etcd member with
+	// 'out of memory' error. Put rate limits in server side.
+	ks := &keyStresser{
+		lg:                clus.lg,
+		m:                 m,
+		keySize:           int(clus.Tester.StressKeySize),
+		keyLargeSize:      int(clus.Tester.StressKeySizeLarge),
+		keySuffixRange:    int(clus.Tester.StressKeySuffixRange),
+		keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
+		keyTxnOps:         int(clus.Tester.StressKeyTxnOps),
+		clientsN:          int(clus.Tester.StressClients),
+		rateLimiter:       clus.rateLimiter,
+	}
+	ksExist := false
+
+	for _, s := range clus.Tester.Stressers {
 		clus.lg.Info(
 			"creating stresser",
-			zap.String("type", stype),
+			zap.String("type", s.Type),
+			zap.Float64("weight", s.Weight),
 			zap.String("endpoint", m.EtcdClientEndpoint),
 		)
-
-		switch stype {
-		case "KV":
-			// TODO: Too intensive stressing clients can panic etcd member with
-			// 'out of memory' error. Put rate limits in server side.
-			stressers[i] = &keyStresser{
-				stype:             rpcpb.Stresser_KV,
-				lg:                clus.lg,
-				m:                 m,
-				keySize:           int(clus.Tester.StressKeySize),
-				keyLargeSize:      int(clus.Tester.StressKeySizeLarge),
-				keySuffixRange:    int(clus.Tester.StressKeySuffixRange),
-				keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
-				keyTxnOps:         int(clus.Tester.StressKeyTxnOps),
-				clientsN:          int(clus.Tester.StressClients),
-				rateLimiter:       clus.rateLimiter,
-			}
+		switch s.Type {
+		case "KV_WRITE_SMALL":
+			ksExist = true
+			ks.weightKVWriteSmall = s.Weight
+		case "KV_WRITE_LARGE":
+			ksExist = true
+			ks.weightKVWriteLarge = s.Weight
+		case "KV_READ_ONE_KEY":
+			ksExist = true
+			ks.weightKVReadOneKey = s.Weight
+		case "KV_READ_RANGE":
+			ksExist = true
+			ks.weightKVReadRange = s.Weight
+		case "KV_DELETE_ONE_KEY":
+			ksExist = true
+			ks.weightKVDeleteOneKey = s.Weight
+		case "KV_DELETE_RANGE":
+			ksExist = true
+			ks.weightKVDeleteRange = s.Weight
+		case "KV_TXN_WRITE_DELETE":
+			ksExist = true
+			ks.weightKVTxnWriteDelete = s.Weight
 
 		case "LEASE":
-			stressers[i] = &leaseStresser{
-				stype:        rpcpb.Stresser_LEASE,
+			stressers = append(stressers, &leaseStresser{
+				stype:        rpcpb.StresserType_LEASE,
 				lg:           clus.lg,
 				m:            m,
 				numLeases:    10, // TODO: configurable
 				keysPerLease: 10, // TODO: configurable
 				rateLimiter:  clus.rateLimiter,
-			}
+			})
 
 		case "ELECTION_RUNNER":
 			reqRate := 100
@@ -83,15 +103,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_ELECTION_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_ELECTION_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "WATCH_RUNNER":
 			reqRate := 100
@@ -105,15 +125,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_WATCH_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_WATCH_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "LOCK_RACER_RUNNER":
 			reqRate := 100
@@ -125,15 +145,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_LOCK_RACER_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_LOCK_RACER_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "LEASE_RUNNER":
 			args := []string{
@@ -141,16 +161,20 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--ttl=30",
 				"--endpoints", m.EtcdClientEndpoint,
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_LEASE_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_LEASE_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				0,
-			)
+			))
 		}
 	}
+
+	if ksExist {
+		return append(stressers, ks)
+	}
 	return stressers
 }

+ 26 - 29
functional/tester/stresser_key.go

@@ -36,11 +36,18 @@ import (
 )
 
 type keyStresser struct {
-	stype rpcpb.Stresser
-	lg    *zap.Logger
+	lg *zap.Logger
 
 	m *rpcpb.Member
 
+	weightKVWriteSmall     float64
+	weightKVWriteLarge     float64
+	weightKVReadOneKey     float64
+	weightKVReadRange      float64
+	weightKVDeleteOneKey   float64
+	weightKVDeleteRange    float64
+	weightKVTxnWriteDelete float64
+
 	keySize           int
 	keyLargeSize      int
 	keySuffixRange    int
@@ -75,26 +82,16 @@ func (s *keyStresser) Stress() error {
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 
 	s.wg.Add(s.clientsN)
-	var stressEntries = []stressEntry{
-		{weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
-		{
-			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
-			f:      newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize),
-		},
-		{weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
-	}
-	if s.keyTxnSuffixRange > 0 {
-		// adjust to make up ±70% of workloads with writes
-		stressEntries[0].weight = 0.35
-		stressEntries = append(stressEntries, stressEntry{
-			weight: 0.35,
-			f:      newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps),
-		})
-	}
-	s.stressTable = createStressTable(stressEntries)
+
+	s.stressTable = createStressTable([]stressEntry{
+		{weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
+		{weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)},
+		{weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)},
+	})
 
 	s.emu.Lock()
 	s.paused = false
@@ -106,7 +103,7 @@ func (s *keyStresser) Stress() error {
 
 	s.lg.Info(
 		"stress START",
-		zap.String("stress-type", s.stype.String()),
+		zap.String("stress-type", "KV"),
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	return nil
@@ -163,7 +160,7 @@ func (s *keyStresser) run() {
 		default:
 			s.lg.Warn(
 				"stress run exiting",
-				zap.String("stress-type", s.stype.String()),
+				zap.String("stress-type", "KV"),
 				zap.String("endpoint", s.m.EtcdClientEndpoint),
 				zap.String("error-type", reflect.TypeOf(err).String()),
 				zap.String("error-desc", rpctypes.ErrorDesc(err)),
@@ -198,7 +195,7 @@ func (s *keyStresser) Close() map[string]int {
 
 	s.lg.Info(
 		"stress STOP",
-		zap.String("stress-type", s.stype.String()),
+		zap.String("stress-type", "KV"),
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	return ess
@@ -211,13 +208,13 @@ func (s *keyStresser) ModifiedKeys() int64 {
 type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
 
 type stressEntry struct {
-	weight float32
+	weight float64
 	f      stressFunc
 }
 
 type stressTable struct {
 	entries    []stressEntry
-	sumWeights float32
+	sumWeights float64
 }
 
 func createStressTable(entries []stressEntry) *stressTable {
@@ -229,8 +226,8 @@ func createStressTable(entries []stressEntry) *stressTable {
 }
 
 func (st *stressTable) choose() stressFunc {
-	v := rand.Float32() * st.sumWeights
-	var sum float32
+	v := rand.Float64() * st.sumWeights
+	var sum float64
 	var idx int
 	for i := range st.entries {
 		sum += st.entries[i].weight

+ 1 - 1
functional/tester/stresser_lease.go

@@ -38,7 +38,7 @@ const (
 )
 
 type leaseStresser struct {
-	stype rpcpb.Stresser
+	stype rpcpb.StresserType
 	lg    *zap.Logger
 
 	m      *rpcpb.Member

+ 2 - 2
functional/tester/stresser_runner.go

@@ -27,7 +27,7 @@ import (
 )
 
 type runnerStresser struct {
-	stype              rpcpb.Stresser
+	stype              rpcpb.StresserType
 	etcdClientEndpoint string
 	lg                 *zap.Logger
 
@@ -42,7 +42,7 @@ type runnerStresser struct {
 }
 
 func newRunnerStresser(
-	stype rpcpb.Stresser,
+	stype rpcpb.StresserType,
 	ep string,
 	lg *zap.Logger,
 	cmdStr string,