123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package main
- import (
- "fmt"
- "strings"
- "sync"
- "time"
- "golang.org/x/time/rate"
- )
- type Stresser interface {
- // Stress starts to stress the etcd cluster
- Stress() error
- // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
- Pause()
- // Close releases all of the Stresser's resources.
- Close()
- // ModifiedKeys reports the number of keys created and deleted by stresser
- ModifiedKeys() int64
- // Checker returns an invariant checker for after the stresser is canceled.
- Checker() Checker
- }
- // nopStresser implements Stresser that does nothing
- type nopStresser struct {
- start time.Time
- qps int
- }
- func (s *nopStresser) Stress() error { return nil }
- func (s *nopStresser) Pause() {}
- func (s *nopStresser) Close() {}
- func (s *nopStresser) ModifiedKeys() int64 {
- return 0
- }
- func (s *nopStresser) Checker() Checker { return nil }
- // compositeStresser implements a Stresser that runs a slice of
- // stressers concurrently.
- type compositeStresser struct {
- stressers []Stresser
- }
- func (cs *compositeStresser) Stress() error {
- for i, s := range cs.stressers {
- if err := s.Stress(); err != nil {
- for j := 0; j < i; j++ {
- cs.stressers[i].Close()
- }
- return err
- }
- }
- return nil
- }
- func (cs *compositeStresser) Pause() {
- var wg sync.WaitGroup
- wg.Add(len(cs.stressers))
- for i := range cs.stressers {
- go func(s Stresser) {
- defer wg.Done()
- s.Pause()
- }(cs.stressers[i])
- }
- wg.Wait()
- }
- func (cs *compositeStresser) Close() {
- var wg sync.WaitGroup
- wg.Add(len(cs.stressers))
- for i := range cs.stressers {
- go func(s Stresser) {
- defer wg.Done()
- s.Close()
- }(cs.stressers[i])
- }
- wg.Wait()
- }
- func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
- for _, stress := range cs.stressers {
- modifiedKey += stress.ModifiedKeys()
- }
- return modifiedKey
- }
- func (cs *compositeStresser) Checker() Checker {
- var chks []Checker
- for _, s := range cs.stressers {
- if chk := s.Checker(); chk != nil {
- chks = append(chks, chk)
- }
- }
- if len(chks) == 0 {
- return nil
- }
- return newCompositeChecker(chks)
- }
- type stressConfig struct {
- keyLargeSize int
- keySize int
- keySuffixRange int
- numLeases int
- keysPerLease int
- rateLimiter *rate.Limiter
- etcdRunnerPath string
- }
- // NewStresser creates stresser from a comma separated list of stresser types.
- func NewStresser(s string, sc *stressConfig, m *member) Stresser {
- types := strings.Split(s, ",")
- if len(types) > 1 {
- stressers := make([]Stresser, len(types))
- for i, stype := range types {
- stressers[i] = NewStresser(stype, sc, m)
- }
- return &compositeStresser{stressers}
- }
- switch s {
- case "nop":
- return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
- case "keys":
- // TODO: Too intensive stressers can panic etcd member with
- // 'out of memory' error. Put rate limits in server side.
- return &keyStresser{
- Endpoint: m.grpcAddr(),
- keyLargeSize: sc.keyLargeSize,
- keySize: sc.keySize,
- keySuffixRange: sc.keySuffixRange,
- N: 100,
- rateLimiter: sc.rateLimiter,
- }
- case "v2keys":
- return &v2Stresser{
- Endpoint: m.ClientURL,
- keySize: sc.keySize,
- keySuffixRange: sc.keySuffixRange,
- N: 100,
- rateLimiter: sc.rateLimiter,
- }
- case "lease":
- return &leaseStresser{
- endpoint: m.grpcAddr(),
- numLeases: sc.numLeases,
- keysPerLease: sc.keysPerLease,
- rateLimiter: sc.rateLimiter,
- }
- case "election-runner":
- reqRate := 100
- args := []string{
- "election",
- fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
- "--dial-timeout=10s",
- "--endpoints", m.grpcAddr(),
- "--total-client-connections=10",
- "--rounds=0", // runs forever
- "--req-rate", fmt.Sprintf("%v", reqRate),
- }
- return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
- case "watch-runner":
- reqRate := 100
- args := []string{
- "watcher",
- "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
- "--total-keys=1",
- "--total-prefixes=1",
- "--watch-per-prefix=1",
- "--endpoints", m.grpcAddr(),
- "--rounds=0", // runs forever
- "--req-rate", fmt.Sprintf("%v", reqRate),
- }
- return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
- case "lock-racer-runner":
- reqRate := 100
- args := []string{
- "lock-racer",
- fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
- "--endpoints", m.grpcAddr(),
- "--total-client-connections=10",
- "--rounds=0", // runs forever
- "--req-rate", fmt.Sprintf("%v", reqRate),
- }
- return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
- case "lease-runner":
- args := []string{
- "lease-renewer",
- "--ttl=30",
- "--endpoints", m.grpcAddr(),
- }
- return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
- default:
- plog.Panicf("unknown stresser type: %s\n", s)
- }
- return nil // never reach here
- }
|