Browse Source

functional-tester/tester: expects no error in NO_FAIL_WITH_STRESS

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
3ae47a8619

+ 17 - 2
tools/functional-tester/tester/cluster_tester.go

@@ -135,7 +135,8 @@ func (clus *Cluster) doRound() error {
 		}
 		}
 
 
 		stressStarted := false
 		stressStarted := false
-		if fa.FailureCase() != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
+		fcase := fa.FailureCase()
+		if fcase != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
 			clus.lg.Info(
 			clus.lg.Info(
 				"starting stressers before injecting failures",
 				"starting stressers before injecting failures",
 				zap.Int("round", clus.rd),
 				zap.Int("round", clus.rd),
@@ -173,7 +174,21 @@ func (clus *Cluster) doRound() error {
 
 
 		if stressStarted {
 		if stressStarted {
 			clus.lg.Info("pausing stresser after failure recovery, before wait health")
 			clus.lg.Info("pausing stresser after failure recovery, before wait health")
-			clus.stresser.Pause()
+			ems := clus.stresser.Pause()
+			if fcase == rpcpb.FailureCase_NO_FAIL_WITH_STRESS && len(ems) > 0 {
+				ess := make([]string, 0, len(ems))
+				cnt := 0
+				for k, v := range ems {
+					ess = append(ess, fmt.Sprintf("%s (count %d)", k, v))
+					cnt += v
+				}
+				clus.lg.Warn(
+					"expected no errors",
+					zap.String("desc", fa.Desc()),
+					zap.Strings("errors", ess),
+				)
+				return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
+			}
 		}
 		}
 
 
 		clus.lg.Info("wait health after recover")
 		clus.lg.Info("wait health after recover")

+ 2 - 2
tools/functional-tester/tester/stress.go

@@ -28,9 +28,9 @@ type Stresser interface {
 	// Stress starts to stress the etcd cluster
 	// Stress starts to stress the etcd cluster
 	Stress() error
 	Stress() error
 	// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
 	// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
-	Pause()
+	Pause() map[string]int
 	// Close releases all of the Stresser's resources.
 	// Close releases all of the Stresser's resources.
-	Close()
+	Close() map[string]int
 	// ModifiedKeys reports the number of keys created and deleted by stresser
 	// ModifiedKeys reports the number of keys created and deleted by stresser
 	ModifiedKeys() int64
 	ModifiedKeys() int64
 	// Checker returns an invariant checker for after the stresser is canceled.
 	// Checker returns an invariant checker for after the stresser is canceled.

+ 14 - 4
tools/functional-tester/tester/stress_composite.go

@@ -34,28 +34,38 @@ func (cs *compositeStresser) Stress() error {
 	return nil
 	return nil
 }
 }
 
 
-func (cs *compositeStresser) Pause() {
+func (cs *compositeStresser) Pause() (ems map[string]int) {
+	ems = make(map[string]int)
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	wg.Add(len(cs.stressers))
 	wg.Add(len(cs.stressers))
 	for i := range cs.stressers {
 	for i := range cs.stressers {
 		go func(s Stresser) {
 		go func(s Stresser) {
 			defer wg.Done()
 			defer wg.Done()
-			s.Pause()
+			errs := s.Pause()
+			for k, v := range errs {
+				ems[k] += v
+			}
 		}(cs.stressers[i])
 		}(cs.stressers[i])
 	}
 	}
 	wg.Wait()
 	wg.Wait()
+	return ems
 }
 }
 
 
-func (cs *compositeStresser) Close() {
+func (cs *compositeStresser) Close() (ems map[string]int) {
+	ems = make(map[string]int)
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	wg.Add(len(cs.stressers))
 	wg.Add(len(cs.stressers))
 	for i := range cs.stressers {
 	for i := range cs.stressers {
 		go func(s Stresser) {
 		go func(s Stresser) {
 			defer wg.Done()
 			defer wg.Done()
-			s.Close()
+			errs := s.Close()
+			for k, v := range errs {
+				ems[k] += v
+			}
 		}(cs.stressers[i])
 		}(cs.stressers[i])
 	}
 	}
 	wg.Wait()
 	wg.Wait()
+	return ems
 }
 }
 
 
 func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
 func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {

+ 25 - 3
tools/functional-tester/tester/stress_key.go

@@ -53,6 +53,10 @@ type keyStresser struct {
 	cancel func()
 	cancel func()
 	cli    *clientv3.Client
 	cli    *clientv3.Client
 
 
+	emu    sync.RWMutex
+	ems    map[string]int
+	paused bool
+
 	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
 	// atomicModifiedKeys records the number of keys created and deleted by the stresser.
 	atomicModifiedKeys int64
 	atomicModifiedKeys int64
 
 
@@ -89,6 +93,10 @@ func (s *keyStresser) Stress() error {
 	}
 	}
 	s.stressTable = createStressTable(stressEntries)
 	s.stressTable = createStressTable(stressEntries)
 
 
+	s.emu.Lock()
+	s.paused = false
+	s.ems = make(map[string]int, 100)
+	s.emu.Unlock()
 	for i := 0; i < s.clientsN; i++ {
 	for i := 0; i < s.clientsN; i++ {
 		go s.run()
 		go s.run()
 	}
 	}
@@ -154,14 +162,21 @@ func (s *keyStresser) run() {
 			)
 			)
 			return
 			return
 		}
 		}
+
+		// only record errors before pausing stressers
+		s.emu.Lock()
+		if !s.paused {
+			s.ems[err.Error()]++
+		}
+		s.emu.Unlock()
 	}
 	}
 }
 }
 
 
-func (s *keyStresser) Pause() {
-	s.Close()
+func (s *keyStresser) Pause() map[string]int {
+	return s.Close()
 }
 }
 
 
-func (s *keyStresser) Close() {
+func (s *keyStresser) Close() map[string]int {
 	s.cancel()
 	s.cancel()
 	s.cli.Close()
 	s.cli.Close()
 	s.wg.Wait()
 	s.wg.Wait()
@@ -170,6 +185,13 @@ func (s *keyStresser) Close() {
 		"key stresser is closed",
 		"key stresser is closed",
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	)
+
+	s.emu.Lock()
+	s.paused = true
+	ess := s.ems
+	s.ems = make(map[string]int, 100)
+	s.emu.Unlock()
+	return ess
 }
 }
 
 
 func (s *keyStresser) ModifiedKeys() int64 {
 func (s *keyStresser) ModifiedKeys() int64 {

+ 4 - 3
tools/functional-tester/tester/stress_lease.go

@@ -447,11 +447,11 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
 	return false, ls.ctx.Err()
 	return false, ls.ctx.Err()
 }
 }
 
 
-func (ls *leaseStresser) Pause() {
-	ls.Close()
+func (ls *leaseStresser) Pause() map[string]int {
+	return ls.Close()
 }
 }
 
 
-func (ls *leaseStresser) Close() {
+func (ls *leaseStresser) Close() map[string]int {
 	ls.lg.Info(
 	ls.lg.Info(
 		"lease stresser is closing",
 		"lease stresser is closing",
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
@@ -464,6 +464,7 @@ func (ls *leaseStresser) Close() {
 		"lease stresser is closed",
 		"lease stresser is closed",
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
 	)
 	)
+	return nil
 }
 }
 
 
 func (ls *leaseStresser) ModifiedKeys() int64 {
 func (ls *leaseStresser) ModifiedKeys() int64 {

+ 4 - 2
tools/functional-tester/tester/stress_runner.go

@@ -77,15 +77,17 @@ func (rs *runnerStresser) Stress() (err error) {
 	return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
 	return syscall.Kill(rs.cmd.Process.Pid, syscall.SIGCONT)
 }
 }
 
 
-func (rs *runnerStresser) Pause() {
+func (rs *runnerStresser) Pause() map[string]int {
 	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
 	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGSTOP)
+	return nil
 }
 }
 
 
-func (rs *runnerStresser) Close() {
+func (rs *runnerStresser) Close() map[string]int {
 	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
 	syscall.Kill(rs.cmd.Process.Pid, syscall.SIGINT)
 	rs.cmd.Wait()
 	rs.cmd.Wait()
 	<-rs.donec
 	<-rs.donec
 	rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
 	rs.rl.SetLimit(rs.rl.Limit() + rate.Limit(rs.reqRate))
+	return nil
 }
 }
 
 
 func (rs *runnerStresser) ModifiedKeys() int64 {
 func (rs *runnerStresser) ModifiedKeys() int64 {