Browse Source

etcd-tester: add rate limiter to stresser

Gyu-Ho Lee 9 years ago
parent
commit
7c39f41e7c

+ 4 - 1
tools/functional-tester/etcd-tester/cluster.go

@@ -36,6 +36,7 @@ type cluster struct {
 	v2Only bool // to be deprecated
 	v2Only bool // to be deprecated
 
 
 	datadir              string
 	datadir              string
+	stressQPS            int
 	stressKeySize        int
 	stressKeySize        int
 	stressKeySuffixRange int
 	stressKeySuffixRange int
 
 
@@ -50,10 +51,11 @@ type ClusterStatus struct {
 }
 }
 
 
 // newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down.
 // newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down.
-func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
+func newCluster(agentEndpoints []string, datadir string, stressQPS, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) {
 	c := &cluster{
 	c := &cluster{
 		v2Only:               isV2Only,
 		v2Only:               isV2Only,
 		datadir:              datadir,
 		datadir:              datadir,
+		stressQPS:            stressQPS,
 		stressKeySize:        stressKeySize,
 		stressKeySize:        stressKeySize,
 		stressKeySuffixRange: stressKeySuffixRange,
 		stressKeySuffixRange: stressKeySuffixRange,
 	}
 	}
@@ -123,6 +125,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
 				Endpoint:       m.grpcAddr(),
 				Endpoint:       m.grpcAddr(),
 				KeySize:        c.stressKeySize,
 				KeySize:        c.stressKeySize,
 				KeySuffixRange: c.stressKeySuffixRange,
 				KeySuffixRange: c.stressKeySuffixRange,
+				qps:            c.stressQPS,
 				N:              stressN,
 				N:              stressN,
 			}
 			}
 		}
 		}

+ 2 - 1
tools/functional-tester/etcd-tester/main.go

@@ -32,13 +32,14 @@ func main() {
 	stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
 	stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
 	stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
 	stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
 	limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
 	limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
+	stressQPS := flag.Int("stress-qps", 5000, "maximum number of stresser requests per second.")
 	schedCases := flag.String("schedule-cases", "", "test case schedule")
 	schedCases := flag.String("schedule-cases", "", "test case schedule")
 	consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
 	consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
 	isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
 	isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
 	flag.Parse()
 	flag.Parse()
 
 
 	endpoints := strings.Split(*endpointStr, ",")
 	endpoints := strings.Split(*endpointStr, ",")
-	c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange, *isV2Only)
+	c, err := newCluster(endpoints, *datadir, *stressQPS, *stressKeySize, *stressKeySuffixRange, *isV2Only)
 	if err != nil {
 	if err != nil {
 		plog.Fatal(err)
 		plog.Fatal(err)
 	}
 	}

+ 74 - 61
tools/functional-tester/etcd-tester/stresser.go

@@ -27,6 +27,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/transport"
 	"google.golang.org/grpc/transport"
@@ -51,10 +52,14 @@ type stresser struct {
 	KeySize        int
 	KeySize        int
 	KeySuffixRange int
 	KeySuffixRange int
 
 
-	N int
+	qps int
+	N   int
+
+	mu sync.Mutex
+	wg *sync.WaitGroup
+
+	rateLimiter *rate.Limiter
 
 
-	mu     sync.Mutex
-	wg     *sync.WaitGroup
 	cancel func()
 	cancel func()
 	conn   *grpc.ClientConn
 	conn   *grpc.ClientConn
 
 
@@ -77,75 +82,83 @@ func (s *stresser) Stress() error {
 	s.conn = conn
 	s.conn = conn
 	s.cancel = cancel
 	s.cancel = cancel
 	s.wg = wg
 	s.wg = wg
+	s.rateLimiter = rate.NewLimiter(rate.Every(time.Second), s.qps)
 	s.mu.Unlock()
 	s.mu.Unlock()
 
 
 	kvc := pb.NewKVClient(conn)
 	kvc := pb.NewKVClient(conn)
 
 
 	for i := 0; i < s.N; i++ {
 	for i := 0; i < s.N; i++ {
-		go func(i int) {
-			defer wg.Done()
-			for {
-				// TODO: 10-second is enough timeout to cover leader failure
-				// and immediate leader election. Find out what other cases this
-				// could be timed out.
-				putctx, putcancel := context.WithTimeout(ctx, 10*time.Second)
-				_, err := kvc.Put(putctx, &pb.PutRequest{
-					Key:   []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
-					Value: []byte(randStr(s.KeySize)),
-				})
-				putcancel()
-				if err != nil {
-					shouldContinue := false
-					switch grpc.ErrorDesc(err) {
-					case context.DeadlineExceeded.Error():
-						// This retries when request is triggered at the same time as
-						// leader failure. When we terminate the leader, the request to
-						// that leader cannot be processed, and times out. Also requests
-						// to followers cannot be forwarded to the old leader, so timing out
-						// as well. We want to keep stressing until the cluster elects a
-						// new leader and start processing requests again.
-						shouldContinue = true
-
-					case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
-						// This retries when request is triggered at the same time as
-						// leader failure and follower nodes receive time out errors
-						// from losing their leader. Followers should retry to connect
-						// to the new leader.
-						shouldContinue = true
-
-					case etcdserver.ErrStopped.Error():
-						// one of the etcd nodes stopped from failure injection
-						shouldContinue = true
-
-					case transport.ErrConnClosing.Desc:
-						// server closed the transport (failure injected node)
-						shouldContinue = true
-
-					case rpctypes.ErrNotCapable.Error():
-						// capability check has not been done (in the beginning)
-						shouldContinue = true
-
-						// default:
-						// errors from stresser.Cancel method:
-						// rpc error: code = 1 desc = context canceled (type grpc.rpcError)
-						// rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError)
-					}
-					if shouldContinue {
-						continue
-					}
-					return
-				}
-				s.mu.Lock()
-				s.success++
-				s.mu.Unlock()
-			}
-		}(i)
+		go s.run(ctx, kvc)
 	}
 	}
 
 
 	<-ctx.Done()
 	<-ctx.Done()
 	return nil
 	return nil
 }
 }
 
 
+func (s *stresser) run(ctx context.Context, kvc pb.KVClient) {
+	defer s.wg.Done()
+
+	for {
+		if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
+			return
+		}
+
+		// TODO: 10-second is enough timeout to cover leader failure
+		// and immediate leader election. Find out what other cases this
+		// could be timed out.
+		putctx, putcancel := context.WithTimeout(ctx, 10*time.Second)
+		_, err := kvc.Put(putctx, &pb.PutRequest{
+			Key:   []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
+			Value: []byte(randStr(s.KeySize)),
+		})
+		putcancel()
+		if err != nil {
+			shouldContinue := false
+			switch grpc.ErrorDesc(err) {
+			case context.DeadlineExceeded.Error():
+				// This retries when request is triggered at the same time as
+				// leader failure. When we terminate the leader, the request to
+				// that leader cannot be processed, and times out. Also requests
+				// to followers cannot be forwarded to the old leader, so timing out
+				// as well. We want to keep stressing until the cluster elects a
+				// new leader and start processing requests again.
+				shouldContinue = true
+
+			case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
+				// This retries when request is triggered at the same time as
+				// leader failure and follower nodes receive time out errors
+				// from losing their leader. Followers should retry to connect
+				// to the new leader.
+				shouldContinue = true
+
+			case etcdserver.ErrStopped.Error():
+				// one of the etcd nodes stopped from failure injection
+				shouldContinue = true
+
+			case transport.ErrConnClosing.Desc:
+				// server closed the transport (failure injected node)
+				shouldContinue = true
+
+			case rpctypes.ErrNotCapable.Error():
+				// capability check has not been done (in the beginning)
+				shouldContinue = true
+
+				// default:
+				// errors from stresser.Cancel method:
+				// rpc error: code = 1 desc = context canceled (type grpc.rpcError)
+				// rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError)
+			}
+			if shouldContinue {
+				continue
+			}
+			return
+		}
+		s.mu.Lock()
+		s.success++
+		s.mu.Unlock()
+	}
+}
+
 func (s *stresser) Cancel() {
 func (s *stresser) Cancel() {
 	s.mu.Lock()
 	s.mu.Lock()
 	cancel, conn, wg := s.cancel, s.conn, s.wg
 	cancel, conn, wg := s.cancel, s.conn, s.wg