浏览代码

Merge pull request #7672 from fanminshi/integrate_runner_to_tester

etcd-tester: integrate etcd runner into etcd tester
fanmin shi 8 年之前
父节点
当前提交
c49a87bd04

+ 1 - 0
tools/functional-tester/build

@@ -7,4 +7,5 @@ fi
 
 CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-agent ./cmd/tools/functional-tester/etcd-agent
 CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-tester ./cmd/tools/functional-tester/etcd-tester
+CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s" -o bin/etcd-runner ./cmd/tools/functional-tester/etcd-runner
 

+ 13 - 0
tools/functional-tester/etcd-tester/checks.go

@@ -245,6 +245,19 @@ func (cchecker *compositeChecker) Check() error {
 	return errsToError(errs)
 }
 
+type runnerChecker struct {
+	errc chan error
+}
+
+func (rc *runnerChecker) Check() error {
+	select {
+	case err := <-rc.errc:
+		return err
+	default:
+		return nil
+	}
+}
+
 type noChecker struct{}
 
 func newNoChecker() Checker        { return &noChecker{} }

+ 97 - 0
tools/functional-tester/etcd-tester/etcd_runner_stresser.go

@@ -0,0 +1,97 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os/exec"
+	"syscall"
+
+	"golang.org/x/time/rate"
+)
+
+type runnerStresser struct {
+	cmd     *exec.Cmd
+	cmdStr  string
+	args    []string
+	rl      *rate.Limiter
+	reqRate int
+
+	errc  chan error
+	donec chan struct{}
+}
+
+func newRunnerStresser(cmdStr string, args []string, rl *rate.Limiter, reqRate int) *runnerStresser {
+	rl.SetLimit(rl.Limit() - rate.Limit(reqRate))
+	return &runnerStresser{
+		cmdStr:  cmdStr,
+		args:    args,
+		rl:      rl,
+		reqRate: reqRate,
+		errc:    make(chan error, 1),
+		donec:   make(chan struct{}),
+	}
+}
+
+func (rs *runnerStresser) setupOnce() (err error) {
+	if rs.cmd != nil {
+		return nil
+	}
+
+	rs.cmd = exec.Command(rs.cmdStr, rs.args...)
+	stderr, err := rs.cmd.StderrPipe()
+	if err != nil {
+		return err
+	}
+
+	go func() {
+		defer close(rs.donec)
+		out, err := ioutil.ReadAll(stderr)
+		if err != nil {
+			rs.errc <- err
+		} else {
+			rs.errc <- fmt.Errorf("(%v %v) stderr %v", rs.cmdStr, rs.args, string(out))
+		}
+	}()
+
+	return rs.cmd.Start()
+}
+
+func (rs *runnerStresser) Stress() (err error) {
+	if err = rs.setupOnce(); err != nil {
+		return err
+	}
+	return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
+}
+
+func (rs *runnerStresser) Pause() {
+	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
+}
+
+func (rs *runnerStresser) Close() {
+	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
+	rs.cmd.Wait()
+	<-rs.donec
+	rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
+}
+
+func (rs *runnerStresser) ModifiedKeys() int64 {
+	return 1
+}
+
+func (rs *runnerStresser) Checker() Checker {
+	return &runnerChecker{rs.errc}
+}

+ 7 - 2
tools/functional-tester/etcd-tester/key_stresser.go

@@ -140,11 +140,16 @@ func (s *keyStresser) run(ctx context.Context) {
 	}
 }
 
-func (s *keyStresser) Cancel() {
+func (s *keyStresser) Pause() {
+	s.Close()
+}
+
+func (s *keyStresser) Close() {
 	s.cancel()
 	s.conn.Close()
 	s.wg.Wait()
-	plog.Infof("keyStresser %q is canceled", s.Endpoint)
+	plog.Infof("keyStresser %q is closed", s.Endpoint)
+
 }
 
 func (s *keyStresser) ModifiedKeys() int64 {

+ 7 - 3
tools/functional-tester/etcd-tester/lease_stresser.go

@@ -361,13 +361,17 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
 	return false, ls.ctx.Err()
 }
 
-func (ls *leaseStresser) Cancel() {
-	plog.Debugf("lease stresser %q is canceling...", ls.endpoint)
+func (ls *leaseStresser) Pause() {
+	ls.Close()
+}
+
+func (ls *leaseStresser) Close() {
+	plog.Debugf("lease stresser %q is closing...", ls.endpoint)
 	ls.cancel()
 	ls.runWg.Wait()
 	ls.aliveWg.Wait()
 	ls.conn.Close()
-	plog.Infof("lease stresser %q is canceled", ls.endpoint)
+	plog.Infof("lease stresser %q is closed", ls.endpoint)
 }
 
 func (ls *leaseStresser) ModifiedKeys() int64 {

+ 4 - 1
tools/functional-tester/etcd-tester/main.go

@@ -51,7 +51,8 @@ func main() {
 	stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
 	schedCases := flag.String("schedule-cases", "", "test case schedule")
 	consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
-	stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop).")
+	stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop, election-runner, watch-runner, lock-racer-runner, lease-runner).")
+	etcdRunnerPath := flag.String("etcd-runner", "", "specify a path of etcd runner binary")
 	failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").")
 	externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector")
 	enablePprof := flag.Bool("enable-pprof", false, "true to enable pprof")
@@ -120,6 +121,8 @@ func main() {
 		keySuffixRange: int(*stressKeySuffixRange),
 		numLeases:      10,
 		keysPerLease:   10,
+
+		etcdRunnerPath: *etcdRunnerPath,
 	}
 
 	t := &tester{

+ 67 - 6
tools/functional-tester/etcd-tester/stresser.go

@@ -15,6 +15,7 @@
 package main
 
 import (
+	"fmt"
 	"strings"
 	"sync"
 	"time"
@@ -28,8 +29,10 @@ func init() { grpclog.SetLogger(plog) }
 type Stresser interface {
 	// Stress starts to stress the etcd cluster
 	Stress() error
-	// Cancel cancels the stress test on the etcd cluster
-	Cancel()
+	// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
+	Pause()
+	// Close releases all of the Stresser's resources.
+	Close()
 	// ModifiedKeys reports the number of keys created and deleted by stresser
 	ModifiedKeys() int64
 	// Checker returns an invariant checker for after the stresser is canceled.
@@ -43,7 +46,8 @@ type nopStresser struct {
 }
 
 func (s *nopStresser) Stress() error { return nil }
-func (s *nopStresser) Cancel()       {}
+func (s *nopStresser) Pause()        {}
+func (s *nopStresser) Close()        {}
 func (s *nopStresser) ModifiedKeys() int64 {
 	return 0
 }
@@ -59,7 +63,7 @@ func (cs *compositeStresser) Stress() error {
 	for i, s := range cs.stressers {
 		if err := s.Stress(); err != nil {
 			for j := 0; j < i; j++ {
-				cs.stressers[i].Cancel()
+				cs.stressers[i].Close()
 			}
 			return err
 		}
@@ -67,13 +71,25 @@ func (cs *compositeStresser) Stress() error {
 	return nil
 }
 
-func (cs *compositeStresser) Cancel() {
+func (cs *compositeStresser) Pause() {
 	var wg sync.WaitGroup
 	wg.Add(len(cs.stressers))
 	for i := range cs.stressers {
 		go func(s Stresser) {
 			defer wg.Done()
-			s.Cancel()
+			s.Pause()
+		}(cs.stressers[i])
+	}
+	wg.Wait()
+}
+
+func (cs *compositeStresser) Close() {
+	var wg sync.WaitGroup
+	wg.Add(len(cs.stressers))
+	for i := range cs.stressers {
+		go func(s Stresser) {
+			defer wg.Done()
+			s.Close()
 		}(cs.stressers[i])
 	}
 	wg.Wait()
@@ -108,6 +124,8 @@ type stressConfig struct {
 	keysPerLease int
 
 	rateLimiter *rate.Limiter
+
+	etcdRunnerPath string
 }
 
 // NewStresser creates stresser from a comma separated list of stresser types.
@@ -149,6 +167,49 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
 			keysPerLease: sc.keysPerLease,
 			rateLimiter:  sc.rateLimiter,
 		}
+	case "election-runner":
+		reqRate := 100
+		args := []string{
+			"election",
+			fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
+			"--dial-timeout=10s",
+			"--endpoints", m.grpcAddr(),
+			"--total-client-connections=10",
+			"--rounds=0", // runs forever
+			"--req-rate", fmt.Sprintf("%v", reqRate),
+		}
+		return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
+	case "watch-runner":
+		reqRate := 100
+		args := []string{
+			"watcher",
+			"--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
+			"--total-keys=1",
+			"--total-prefixes=1",
+			"--watch-per-prefix=1",
+			"--endpoints", m.grpcAddr(),
+			"--rounds=0", // runs forever
+			"--req-rate", fmt.Sprintf("%v", reqRate),
+		}
+		return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
+	case "lock-racer-runner":
+		reqRate := 100
+		args := []string{
+			"lock-racer",
+			fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
+			"--endpoints", m.grpcAddr(),
+			"--total-client-connections=10",
+			"--rounds=0", // runs forever
+			"--req-rate", fmt.Sprintf("%v", reqRate),
+		}
+		return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
+	case "lease-runner":
+		args := []string{
+			"lease-renewer",
+			"--ttl=30",
+			"--endpoints", m.grpcAddr(),
+		}
+		return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
 	default:
 		plog.Panicf("unknown stresser type: %s\n", s)
 	}

+ 13 - 7
tools/functional-tester/etcd-tester/tester.go

@@ -114,7 +114,7 @@ func (tt *tester) doRound(round int) error {
 			return fmt.Errorf("recovery error: %v", err)
 		}
 		plog.Infof("%s recovered failure", tt.logPrefix())
-		tt.cancelStresser()
+		tt.pauseStresser()
 		plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
 		if err := tt.cluster.WaitHealth(); err != nil {
 			return fmt.Errorf("wait full health error: %v", err)
@@ -161,7 +161,7 @@ func (tt *tester) checkConsistency() (err error) {
 }
 
 func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
-	tt.cancelStresser()
+	tt.pauseStresser()
 	defer func() {
 		if err == nil {
 			err = tt.startStresser()
@@ -217,7 +217,7 @@ func (tt *tester) cleanup() error {
 	}
 	caseFailedTotalCounter.WithLabelValues(desc).Inc()
 
-	tt.cancelStresser()
+	tt.closeStresser()
 	if err := tt.cluster.Cleanup(); err != nil {
 		plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
 		return err
@@ -229,10 +229,10 @@ func (tt *tester) cleanup() error {
 	return tt.resetStressCheck()
 }
 
-func (tt *tester) cancelStresser() {
-	plog.Infof("%s canceling the stressers...", tt.logPrefix())
-	tt.stresser.Cancel()
-	plog.Infof("%s canceled stressers", tt.logPrefix())
+func (tt *tester) pauseStresser() {
+	plog.Infof("%s pausing the stressers...", tt.logPrefix())
+	tt.stresser.Pause()
+	plog.Infof("%s paused stressers", tt.logPrefix())
 }
 
 func (tt *tester) startStresser() (err error) {
@@ -242,6 +242,12 @@ func (tt *tester) startStresser() (err error) {
 	return err
 }
 
+func (tt *tester) closeStresser() {
+	plog.Infof("%s closing the stressers...", tt.logPrefix())
+	tt.stresser.Close()
+	plog.Infof("%s closed stressers", tt.logPrefix())
+}
+
 func (tt *tester) resetStressCheck() error {
 	plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
 	cs := &compositeStresser{}

+ 5 - 1
tools/functional-tester/etcd-tester/v2_stresser.go

@@ -93,11 +93,15 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
 	}
 }
 
-func (s *v2Stresser) Cancel() {
+func (s *v2Stresser) Pause() {
 	s.cancel()
 	s.wg.Wait()
 }
 
+func (s *v2Stresser) Close() {
+	s.Pause()
+}
+
 func (s *v2Stresser) ModifiedKeys() int64 {
 	return atomic.LoadInt64(&s.atomicModifiedKey)
 }