Pārlūkot izejas kodu

Merge pull request #7812 from fanminshi/refactor_runner

etcd-runner: fix runner and minor refactoring.
fanmin shi 8 gadi atpakaļ
vecāks
revīzija
60b9adc267

+ 41 - 24
tools/functional-tester/etcd-runner/command/election_command.go

@@ -20,32 +20,34 @@ import (
 	"fmt"
 
 	"github.com/coreos/etcd/clientv3/concurrency"
+
 	"github.com/spf13/cobra"
 )
 
 // NewElectionCommand returns the cobra command for "election runner".
 func NewElectionCommand() *cobra.Command {
 	cmd := &cobra.Command{
-		Use:   "election",
+		Use:   "election [election name (defaults to 'elector')]",
 		Short: "Performs election operation",
 		Run:   runElectionFunc,
 	}
-	cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
 	cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
 	return cmd
 }
 
 func runElectionFunc(cmd *cobra.Command, args []string) {
-	if len(args) > 0 {
-		ExitWithError(ExitBadArgs, errors.New("election does not take any argument"))
+	election := "elector"
+	if len(args) == 1 {
+		election = args[0]
 	}
-
-	rcs := make([]roundClient, totalClientConnections)
-	validatec, releasec := make(chan struct{}, len(rcs)), make(chan struct{}, len(rcs))
-	for range rcs {
-		releasec <- struct{}{}
+	if len(args) > 1 {
+		ExitWithError(ExitBadArgs, errors.New("election takes at most one argument"))
 	}
 
+	rcs := make([]roundClient, totalClientConnections)
+	validatec := make(chan struct{}, len(rcs))
+	// nextc closes when election is ready for next round.
+	nextc := make(chan struct{})
 	eps := endpointsFromFlag(cmd)
 	dialTimeout := dialTimeoutFromCmd(cmd)
 
@@ -53,6 +55,10 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 		v := fmt.Sprintf("%d", i)
 		observedLeader := ""
 		validateWaiters := 0
+		var rcNextc chan struct{}
+		setRcNextc := func() {
+			rcNextc = nextc
+		}
 
 		rcs[i].c = newClient(eps, dialTimeout)
 		var (
@@ -65,18 +71,22 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 				break
 			}
 		}
-		e := concurrency.NewElection(s, "electors")
 
-		rcs[i].acquire = func() error {
-			<-releasec
+		e := concurrency.NewElection(s, election)
+		rcs[i].acquire = func() (err error) {
 			ctx, cancel := context.WithCancel(context.Background())
+			donec := make(chan struct{})
 			go func() {
-				if ol, ok := <-e.Observe(ctx); ok {
-					observedLeader = string(ol.Kvs[0].Value)
-					if observedLeader != v {
-						cancel()
+				defer close(donec)
+				for ctx.Err() == nil {
+					if ol, ok := <-e.Observe(ctx); ok {
+						observedLeader = string(ol.Kvs[0].Value)
+						break
 					}
 				}
+				if observedLeader != v {
+					cancel()
+				}
 			}()
 			err = e.Campaign(ctx, v)
 			if err == nil {
@@ -85,18 +95,24 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 			if observedLeader == v {
 				validateWaiters = len(rcs)
 			}
+			cancel()
+			<-donec
 			select {
 			case <-ctx.Done():
 				return nil
 			default:
-				cancel()
 				return err
 			}
 		}
 		rcs[i].validate = func() error {
-			if l, err := e.Leader(context.TODO()); err == nil && string(l.Kvs[0].Value) != observedLeader {
-				return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
+			l, err := e.Leader(context.TODO())
+			if err == nil && string(l.Kvs[0].Value) != observedLeader {
+				return fmt.Errorf("expected leader %q, got %q", observedLeader, l.Kvs[0].Value)
+			}
+			if err != nil {
+				return err
 			}
+			setRcNextc()
 			validatec <- struct{}{}
 			return nil
 		}
@@ -113,14 +129,15 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
 				return err
 			}
 			if observedLeader == v {
-				for range rcs {
-					releasec <- struct{}{}
-				}
+				close(nextc)
+				nextc = make(chan struct{})
 			}
+			<-rcNextc
 			observedLeader = ""
 			return nil
 		}
 	}
-
-	doRounds(rcs, rounds)
+	// each client creates 1 key from Campaign() and delete it from Resign()
+	// a round involves in 2*len(rcs) requests.
+	doRounds(rcs, rounds, 2*len(rcs))
 }

+ 17 - 18
tools/functional-tester/etcd-runner/command/global.go

@@ -15,6 +15,7 @@
 package command
 
 import (
+	"context"
 	"fmt"
 	"log"
 	"sync"
@@ -23,25 +24,18 @@ import (
 	"github.com/coreos/etcd/clientv3"
 
 	"github.com/spf13/cobra"
+	"golang.org/x/time/rate"
 )
 
+// shared flags
 var (
-	rounds                 int           // total number of rounds the operation needs to be performed
-	totalClientConnections int           // total number of client connections to be made with server
-	noOfPrefixes           int           // total number of prefixes which will be watched upon
-	watchPerPrefix         int           // number of watchers per prefix
-	reqRate                int           // put request per second
-	totalKeys              int           // total number of keys for operation
-	runningTime            time.Duration // time for which operation should be performed
+	totalClientConnections int // total number of client connections to be made with server
+	endpoints              []string
+	dialTimeout            time.Duration
+	rounds                 int // total number of rounds to run; set to <= 0 to run forever.
+	reqRate                int // maximum number of requests per second.
 )
 
-// GlobalFlags are flags that defined globally
-// and are inherited to all sub-commands.
-type GlobalFlags struct {
-	Endpoints   []string
-	DialTimeout time.Duration
-}
-
 type roundClient struct {
 	c        *clientv3.Client
 	progress int
@@ -61,16 +55,21 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client {
 	return c
 }
 
-func doRounds(rcs []roundClient, rounds int) {
+func doRounds(rcs []roundClient, rounds int, requests int) {
 	var mu sync.Mutex
 	var wg sync.WaitGroup
 
 	wg.Add(len(rcs))
 	finished := make(chan struct{})
+	limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate)
 	for i := range rcs {
 		go func(rc *roundClient) {
 			defer wg.Done()
-			for rc.progress < rounds {
+			for rc.progress < rounds || rounds <= 0 {
+				if err := limiter.WaitN(context.Background(), requests/len(rcs)); err != nil {
+					log.Panicf("rate limiter error %v", err)
+				}
+
 				for rc.acquire() != nil { /* spin */
 				}
 
@@ -85,7 +84,7 @@ func doRounds(rcs []roundClient, rounds int) {
 				finished <- struct{}{}
 
 				mu.Lock()
-				for rc.release() != nil {
+				for rc.release() != nil { /* spin */
 					mu.Unlock()
 					mu.Lock()
 				}
@@ -95,7 +94,7 @@ func doRounds(rcs []roundClient, rounds int) {
 	}
 
 	start := time.Now()
-	for i := 1; i < len(rcs)*rounds+1; i++ {
+	for i := 1; i < len(rcs)*rounds+1 || rounds <= 0; i++ {
 		select {
 		case <-finished:
 			if i%100 == 0 {

+ 1 - 1
tools/functional-tester/etcd-runner/help.go → tools/functional-tester/etcd-runner/command/help.go

@@ -14,7 +14,7 @@
 
 // copied from https://github.com/rkt/rkt/blob/master/rkt/help.go
 
-package main
+package command
 
 import (
 	"bytes"

+ 9 - 3
tools/functional-tester/etcd-runner/command/lease_renewer_command.go

@@ -22,11 +22,16 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
+
 	"github.com/spf13/cobra"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 )
 
+var (
+	leaseTTL int64
+)
+
 // NewLeaseRenewerCommand returns the cobra command for "lease-renewer runner".
 func NewLeaseRenewerCommand() *cobra.Command {
 	cmd := &cobra.Command{
@@ -34,6 +39,7 @@ func NewLeaseRenewerCommand() *cobra.Command {
 		Short: "Performs lease renew operation",
 		Run:   runLeaseRenewerFunc,
 	}
+	cmd.Flags().Int64Var(&leaseTTL, "ttl", 5, "lease's ttl")
 	return cmd
 }
 
@@ -53,7 +59,7 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
 			err error
 		)
 		for {
-			l, err = c.Lease.Grant(ctx, 5)
+			l, err = c.Lease.Grant(ctx, leaseTTL)
 			if err == nil {
 				break
 			}
@@ -65,14 +71,14 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) {
 			err = lk.Err
 			if grpc.Code(err) == codes.NotFound {
 				if time.Since(expire) < 0 {
-					log.Printf("bad renew! exceeded: %v", time.Since(expire))
+					log.Fatalf("bad renew! exceeded: %v", time.Since(expire))
 					for {
 						lk = c.Lease.KeepAliveOnce(ctx, l.ID)
 						fmt.Println(lk)
 						time.Sleep(time.Second)
 					}
 				}
-				log.Printf("lost lease %d, expire: %v\n", l.ID, expire)
+				log.Fatalf("lost lease %d, expire: %v\n", l.ID, expire)
 				break
 			}
 			if err != nil {

+ 13 - 6
tools/functional-tester/etcd-runner/command/lock_racer_command.go

@@ -20,24 +20,29 @@ import (
 	"fmt"
 
 	"github.com/coreos/etcd/clientv3/concurrency"
+
 	"github.com/spf13/cobra"
 )
 
 // NewLockRacerCommand returns the cobra command for "lock-racer runner".
 func NewLockRacerCommand() *cobra.Command {
 	cmd := &cobra.Command{
-		Use:   "lock-racer",
+		Use:   "lock-racer [name of lock (defaults to 'racers')]",
 		Short: "Performs lock race operation",
 		Run:   runRacerFunc,
 	}
-	cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
 	cmd.Flags().IntVar(&totalClientConnections, "total-client-connections", 10, "total number of client connections")
 	return cmd
 }
 
 func runRacerFunc(cmd *cobra.Command, args []string) {
-	if len(args) > 0 {
-		ExitWithError(ExitBadArgs, errors.New("lock-racer does not take any argument"))
+	racers := "racers"
+	if len(args) == 1 {
+		racers = args[0]
+	}
+
+	if len(args) > 1 {
+		ExitWithError(ExitBadArgs, errors.New("lock-racer takes at most one argument"))
 	}
 
 	rcs := make([]roundClient, totalClientConnections)
@@ -61,7 +66,7 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
 				break
 			}
 		}
-		m := concurrency.NewMutex(s, "racers")
+		m := concurrency.NewMutex(s, racers)
 		rcs[i].acquire = func() error { return m.Lock(ctx) }
 		rcs[i].validate = func() error {
 			if cnt++; cnt != 1 {
@@ -77,5 +82,7 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
 			return nil
 		}
 	}
-	doRounds(rcs, rounds)
+	// each client creates 1 key from NewMutex() and delete it from Unlock()
+	// a round involves in 2*len(rcs) requests.
+	doRounds(rcs, rounds, 2*len(rcs))
 }

+ 69 - 0
tools/functional-tester/etcd-runner/command/root.go

@@ -0,0 +1,69 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package command
+
+import (
+	"log"
+	"math/rand"
+	"time"
+
+	"github.com/spf13/cobra"
+)
+
+const (
+	cliName        = "etcd-runner"
+	cliDescription = "Stress tests using clientv3 functionality.."
+
+	defaultDialTimeout = 2 * time.Second
+)
+
+var (
+	rootCmd = &cobra.Command{
+		Use:        cliName,
+		Short:      cliDescription,
+		SuggestFor: []string{"etcd-runner"},
+	}
+)
+
+func init() {
+	cobra.EnablePrefixMatching = true
+
+	rand.Seed(time.Now().UnixNano())
+
+	log.SetFlags(log.Lmicroseconds)
+
+	rootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
+	rootCmd.PersistentFlags().DurationVar(&dialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")
+	rootCmd.PersistentFlags().IntVar(&reqRate, "req-rate", 30, "maximum number of requests per second")
+	rootCmd.PersistentFlags().IntVar(&rounds, "rounds", 100, "number of rounds to run; 0 to run forever")
+
+	rootCmd.AddCommand(
+		NewElectionCommand(),
+		NewLeaseRenewerCommand(),
+		NewLockRacerCommand(),
+		NewWatchCommand(),
+	)
+}
+
+func Start() {
+	rootCmd.SetUsageFunc(usageFunc)
+
+	// Make help just show the usage
+	rootCmd.SetHelpTemplate(`{{.UsageString}}`)
+
+	if err := rootCmd.Execute(); err != nil {
+		ExitWithError(ExitError, err)
+	}
+}

+ 18 - 9
tools/functional-tester/etcd-runner/command/watch_command.go

@@ -24,10 +24,19 @@ import (
 
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/pkg/stringutil"
+
 	"github.com/spf13/cobra"
 	"golang.org/x/time/rate"
 )
 
+var (
+	runningTime    time.Duration // time for which operation should be performed
+	noOfPrefixes   int           // total number of prefixes which will be watched upon
+	watchPerPrefix int           // number of watchers per prefix
+	watchPrefix    string        // prefix append to keys in watcher
+	totalKeys      int           // total number of keys for operation
+)
+
 // NewWatchCommand returns the cobra command for "watcher runner".
 func NewWatchCommand() *cobra.Command {
 	cmd := &cobra.Command{
@@ -35,12 +44,12 @@ func NewWatchCommand() *cobra.Command {
 		Short: "Performs watch operation",
 		Run:   runWatcherFunc,
 	}
-	cmd.Flags().IntVar(&rounds, "rounds", 100, "number of rounds to run")
 	cmd.Flags().DurationVar(&runningTime, "running-time", 60, "number of seconds to run")
+	cmd.Flags().StringVar(&watchPrefix, "prefix", "", "the prefix to append on all keys")
 	cmd.Flags().IntVar(&noOfPrefixes, "total-prefixes", 10, "total no of prefixes to use")
 	cmd.Flags().IntVar(&watchPerPrefix, "watch-per-prefix", 10, "number of watchers per prefix")
-	cmd.Flags().IntVar(&reqRate, "req-rate", 30, "rate at which put request will be performed")
 	cmd.Flags().IntVar(&totalKeys, "total-keys", 1000, "total number of keys to watch")
+
 	return cmd
 }
 
@@ -50,7 +59,7 @@ func runWatcherFunc(cmd *cobra.Command, args []string) {
 	}
 
 	ctx := context.Background()
-	for round := 0; round < rounds; round++ {
+	for round := 0; round < rounds || rounds <= 0; round++ {
 		fmt.Println("round", round)
 		performWatchOnPrefixes(ctx, cmd, round)
 	}
@@ -94,7 +103,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
 				if err = limiter.Wait(ctxt); err != nil {
 					return
 				}
-				if err = putKeyAtMostOnce(ctxt, client, roundPrefix+"-"+prefix+"-"+key); err != nil {
+				if err = putKeyAtMostOnce(ctxt, client, watchPrefix+"-"+roundPrefix+"-"+prefix+"-"+key); err != nil {
 					log.Fatalf("failed to put key: %v", err)
 					return
 				}
@@ -112,15 +121,15 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
 			rc := newClient(eps, dialTimeout)
 			rcs = append(rcs, rc)
 
-			watchPrefix := roundPrefix + "-" + prefix
+			wprefix := watchPrefix + "-" + roundPrefix + "-" + prefix
 
-			wc := rc.Watch(ctxc, watchPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
+			wc := rc.Watch(ctxc, wprefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
 			wcs = append(wcs, wc)
 
 			wg.Add(1)
 			go func() {
 				defer wg.Done()
-				checkWatchResponse(wc, watchPrefix, keys)
+				checkWatchResponse(wc, wprefix, keys)
 			}()
 		}
 	}
@@ -139,7 +148,7 @@ func performWatchOnPrefixes(ctx context.Context, cmd *cobra.Command, round int)
 		rc.Close()
 	}
 
-	if err = deletePrefix(ctx, client, roundPrefix); err != nil {
+	if err = deletePrefix(ctx, client, watchPrefix); err != nil {
 		log.Fatalf("failed to clean up keys after test: %v", err)
 	}
 }
@@ -148,7 +157,7 @@ func checkWatchResponse(wc clientv3.WatchChan, prefix string, keys []string) {
 	for n := 0; n < len(keys); {
 		wr, more := <-wc
 		if !more {
-			log.Fatalf("expect more keys (received %d/%d) for %s", len(keys), n, prefix)
+			log.Fatalf("expect more keys (received %d/%d) for %s", n, len(keys), prefix)
 		}
 		for _, event := range wr.Events {
 			expectedKey := prefix + "-" + keys[n]

+ 2 - 55
tools/functional-tester/etcd-runner/main.go

@@ -15,61 +15,8 @@
 // etcd-runner is a command line application that performs tests on etcd.
 package main
 
-import (
-	"log"
-	"time"
-
-	"github.com/coreos/etcd/tools/functional-tester/etcd-runner/command"
-	"github.com/spf13/cobra"
-)
-
-const (
-	cliName        = "etcd-runner"
-	cliDescription = "Stress tests using clientv3 functionality.."
-
-	defaultDialTimeout = 2 * time.Second
-)
-
-var (
-	globalFlags = command.GlobalFlags{}
-)
-
-var (
-	rootCmd = &cobra.Command{
-		Use:        cliName,
-		Short:      cliDescription,
-		SuggestFor: []string{"etcd-runner"},
-	}
-)
-
-func init() {
-	log.SetFlags(log.Lmicroseconds)
-	rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
-	rootCmd.PersistentFlags().DurationVar(&globalFlags.DialTimeout, "dial-timeout", defaultDialTimeout, "dial timeout for client connections")
-
-	rootCmd.AddCommand(
-		command.NewElectionCommand(),
-		command.NewLeaseRenewerCommand(),
-		command.NewLockRacerCommand(),
-		command.NewWatchCommand(),
-	)
-}
-
-func init() {
-	cobra.EnablePrefixMatching = true
-}
-
-func Start() {
-	rootCmd.SetUsageFunc(usageFunc)
-
-	// Make help just show the usage
-	rootCmd.SetHelpTemplate(`{{.UsageString}}`)
-
-	if err := rootCmd.Execute(); err != nil {
-		command.ExitWithError(command.ExitError, err)
-	}
-}
+import "github.com/coreos/etcd/tools/functional-tester/etcd-runner/command"
 
 func main() {
-	Start()
+	command.Start()
 }