Browse Source

functional-tester/tester: add network fault test cases with snapshot trigger

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
f4cd33b83c

+ 11 - 0
tools/functional-tester/tester/checks.go

@@ -104,6 +104,16 @@ type leaseChecker struct {
 }
 
 func (lc *leaseChecker) Check() error {
+	if lc.ls == nil {
+		return nil
+	}
+	if lc.ls != nil &&
+		(lc.ls.revokedLeases == nil ||
+			lc.ls.aliveLeases == nil ||
+			lc.ls.shortLivedLeases == nil) {
+		return nil
+	}
+
 	cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
 	if err != nil {
 		return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
@@ -114,6 +124,7 @@ func (lc *leaseChecker) Check() error {
 		}
 	}()
 	lc.cli = cli
+
 	if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
 		return err
 	}

+ 43 - 58
tools/functional-tester/tester/cluster.go

@@ -61,7 +61,6 @@ type Cluster struct {
 }
 
 func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
-	lg.Info("reading configuration file", zap.String("path", fpath))
 	bts, err := ioutil.ReadFile(fpath)
 	if err != nil {
 		return nil, err
@@ -204,7 +203,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
 	clus.failures = make([]Failure, 0)
 
 	for i, ap := range clus.Members {
-		clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr))
 		var err error
 		clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
 		if err != nil {
@@ -213,7 +211,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
 		clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
 		clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
 
-		clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
 		clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
 		if err != nil {
 			return nil, err
@@ -240,7 +237,9 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
 		rate.Limit(int(clus.Tester.StressQPS)),
 		int(clus.Tester.StressQPS),
 	)
+
 	clus.updateStresserChecker()
+
 	return clus, nil
 }
 
@@ -265,32 +264,48 @@ func (clus *Cluster) updateFailures() {
 		switch cs {
 		case "KILL_ONE_FOLLOWER":
 			clus.failures = append(clus.failures, newFailureKillOneFollower())
+		case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
 		case "KILL_LEADER":
 			clus.failures = append(clus.failures, newFailureKillLeader())
-		case "KILL_ONE_FOLLOWER_FOR_LONG":
-			clus.failures = append(clus.failures, newFailureKillOneFollowerForLongTime())
-		case "KILL_LEADER_FOR_LONG":
-			clus.failures = append(clus.failures, newFailureKillLeaderForLongTime())
+		case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
 		case "KILL_QUORUM":
 			clus.failures = append(clus.failures, newFailureKillQuorum())
 		case "KILL_ALL":
 			clus.failures = append(clus.failures, newFailureKillAll())
+
 		case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
 			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
+		case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
 		case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
 			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
+		case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
+		case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
+			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
 		case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
 			clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
+
 		case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
 			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus))
+		case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot())
 		case "DELAY_PEER_PORT_TX_RX_LEADER":
 			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus))
+		case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
+			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot())
+		case "DELAY_PEER_PORT_TX_RX_QUORUM":
+			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus))
 		case "DELAY_PEER_PORT_TX_RX_ALL":
 			clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus))
+
 		case "NO_FAIL_WITH_STRESS":
 			clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
 		case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
 			clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
+
 		case "EXTERNAL":
 			clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
 		case "FAILPOINTS":
@@ -317,7 +332,6 @@ func (clus *Cluster) shuffleFailures() {
 	n := len(clus.failures)
 	cp := coprime(n)
 
-	clus.lg.Info("shuffling test failure cases", zap.Int("total", n))
 	fs := make([]Failure, n)
 	for i := 0; i < n; i++ {
 		fs[i] = clus.failures[(cp*i+offset)%n]
@@ -355,12 +369,6 @@ func gcd(x, y int) int {
 }
 
 func (clus *Cluster) updateStresserChecker() {
-	clus.lg.Info(
-		"updating stressers",
-		zap.Int("round", clus.rd),
-		zap.Int("case", clus.cs),
-	)
-
 	cs := &compositeStresser{}
 	for _, m := range clus.Members {
 		cs.stressers = append(cs.stressers, newStresser(clus, m))
@@ -397,21 +405,17 @@ func (clus *Cluster) checkConsistency() (err error) {
 		}
 	}()
 
-	clus.lg.Info(
-		"checking consistency and invariant of cluster",
-		zap.Int("round", clus.rd),
-		zap.Int("case", clus.cs),
-		zap.String("desc", clus.failures[clus.cs].Desc()),
-	)
 	if err = clus.checker.Check(); err != nil {
 		clus.lg.Warn(
-			"checker.Check failed",
+			"consistency check FAIL",
+			zap.Int("round", clus.rd),
+			zap.Int("case", clus.cs),
 			zap.Error(err),
 		)
 		return err
 	}
 	clus.lg.Info(
-		"checked consistency and invariant of cluster",
+		"consistency check ALL PASS",
 		zap.Int("round", clus.rd),
 		zap.Int("case", clus.cs),
 		zap.String("desc", clus.failures[clus.cs].Desc()),
@@ -468,11 +472,6 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
 		clus.agentRequests[idx].Operation = op
 	}
 
-	clus.lg.Info(
-		"sending request",
-		zap.String("operation", op.String()),
-		zap.String("to", clus.Members[idx].EtcdClientEndpoint),
-	)
 	err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
 	clus.lg.Info(
 		"sent request",
@@ -484,11 +483,6 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
 		return err
 	}
 
-	clus.lg.Info(
-		"receiving response",
-		zap.String("operation", op.String()),
-		zap.String("from", clus.Members[idx].EtcdClientEndpoint),
-	)
 	resp, err := clus.agentStreams[idx].Recv()
 	if resp != nil {
 		clus.lg.Info(
@@ -519,22 +513,19 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
 
 // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
 func (clus *Cluster) DestroyEtcdAgents() {
-	clus.lg.Info("destroying etcd servers and agents")
 	err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
 	if err != nil {
-		clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err))
+		clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
 	} else {
-		clus.lg.Info("destroyed etcd servers and agents")
+		clus.lg.Info("destroying etcd/agents PASS")
 	}
 
 	for i, conn := range clus.agentConns {
-		clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
 		err := conn.Close()
 		clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
 	}
 
 	if clus.testerHTTPServer != nil {
-		clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
 		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 		err := clus.testerHTTPServer.Shutdown(ctx)
 		cancel()
@@ -552,14 +543,9 @@ func (clus *Cluster) WaitHealth() error {
 	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
 	for i := 0; i < 60; i++ {
 		for _, m := range clus.Members {
-			clus.lg.Info(
-				"writing health key",
-				zap.Int("retries", i),
-				zap.String("endpoint", m.EtcdClientEndpoint),
-			)
 			if err = m.WriteHealthKey(); err != nil {
 				clus.lg.Warn(
-					"writing health key failed",
+					"health check FAIL",
 					zap.Int("retries", i),
 					zap.String("endpoint", m.EtcdClientEndpoint),
 					zap.Error(err),
@@ -567,15 +553,16 @@ func (clus *Cluster) WaitHealth() error {
 				break
 			}
 			clus.lg.Info(
-				"wrote health key",
+				"health check PASS",
 				zap.Int("retries", i),
 				zap.String("endpoint", m.EtcdClientEndpoint),
 			)
 		}
 		if err == nil {
 			clus.lg.Info(
-				"writing health key success on all members",
-				zap.Int("retries", i),
+				"health check ALL PASS",
+				zap.Int("round", clus.rd),
+				zap.Int("case", clus.cs),
 			)
 			return nil
 		}
@@ -639,7 +626,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 
 	for i, m := range clus.Members {
 		clus.lg.Info(
-			"compacting",
+			"compact START",
 			zap.String("endpoint", m.EtcdClientEndpoint),
 			zap.Int64("compact-revision", rev),
 			zap.Duration("timeout", timeout),
@@ -657,7 +644,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 				)
 			} else {
 				clus.lg.Warn(
-					"compact failed",
+					"compact FAIL",
 					zap.String("endpoint", m.EtcdClientEndpoint),
 					zap.Int64("compact-revision", rev),
 					zap.Error(cerr),
@@ -669,7 +656,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
 
 		if succeed {
 			clus.lg.Info(
-				"compacted",
+				"compact PASS",
 				zap.String("endpoint", m.EtcdClientEndpoint),
 				zap.Int64("compact-revision", rev),
 				zap.Duration("timeout", timeout),
@@ -693,24 +680,22 @@ func (clus *Cluster) checkCompact(rev int64) error {
 }
 
 func (clus *Cluster) defrag() error {
-	clus.lg.Info(
-		"defragmenting",
-		zap.Int("round", clus.rd),
-		zap.Int("case", clus.cs),
-	)
 	for _, m := range clus.Members {
 		if err := m.Defrag(); err != nil {
 			clus.lg.Warn(
-				"defrag failed",
-				zap.Int("round", clus.rd),
-				zap.Int("case", clus.cs),
+				"defrag FAIL",
+				zap.String("endpoint", m.EtcdClientEndpoint),
 				zap.Error(err),
 			)
 			return err
 		}
+		clus.lg.Info(
+			"defrag PASS",
+			zap.String("endpoint", m.EtcdClientEndpoint),
+		)
 	}
 	clus.lg.Info(
-		"defragmented",
+		"defrag ALL PASS",
 		zap.Int("round", clus.rd),
 		zap.Int("case", clus.cs),
 	)

+ 11 - 5
tools/functional-tester/tester/cluster_test.go

@@ -112,24 +112,30 @@ func Test_newCluster(t *testing.T) {
 		Tester: &rpcpb.Tester{
 			TesterNetwork:    "tcp",
 			TesterAddr:       "127.0.0.1:9028",
-			DelayLatencyMs:   500,
-			DelayLatencyMsRv: 50,
+			DelayLatencyMs:   5000,
+			DelayLatencyMsRv: 150,
 			RoundLimit:       1,
 			ExitOnFailure:    true,
 			ConsistencyCheck: true,
 			EnablePprof:      true,
 			FailureCases: []string{
 				"KILL_ONE_FOLLOWER",
+				"KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
 				"KILL_LEADER",
-				"KILL_ONE_FOLLOWER_FOR_LONG",
-				"KILL_LEADER_FOR_LONG",
+				"KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT",
 				"KILL_QUORUM",
 				"KILL_ALL",
 				"BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER",
+				"BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
 				"BLACKHOLE_PEER_PORT_TX_RX_LEADER",
+				"BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"BLACKHOLE_PEER_PORT_TX_RX_QUORUM",
 				"BLACKHOLE_PEER_PORT_TX_RX_ALL",
 				"DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
+				"DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
 				"DELAY_PEER_PORT_TX_RX_LEADER",
+				"DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"DELAY_PEER_PORT_TX_RX_QUORUM",
 				"DELAY_PEER_PORT_TX_RX_ALL",
 				"NO_FAIL_WITH_STRESS",
 				"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
@@ -146,7 +152,7 @@ func Test_newCluster(t *testing.T) {
 			StressKeySuffixRangeTxn: 100,
 			StressKeyTxnOps:         10,
 			StressClients:           100,
-			StressQPS:               1000,
+			StressQPS:               2000,
 		},
 	}
 

+ 40 - 42
tools/functional-tester/tester/cluster_tester.go

@@ -39,7 +39,7 @@ func (clus *Cluster) StartTester() {
 
 		if err := clus.doRound(); err != nil {
 			clus.lg.Warn(
-				"doRound failed; returning",
+				"round FAIL",
 				zap.Int("round", clus.rd),
 				zap.Int("case", clus.cs),
 				zap.Error(err),
@@ -62,21 +62,21 @@ func (clus *Cluster) StartTester() {
 		timeout := 10 * time.Second
 		timeout += time.Duration(modifiedKey/compactQPS) * time.Second
 		clus.lg.Info(
-			"compacting",
+			"compact START",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.Duration("timeout", timeout),
 		)
 		if err := clus.compact(revToCompact, timeout); err != nil {
 			clus.lg.Warn(
-				"compact failed",
+				"compact FAIL",
 				zap.Int("round", clus.rd),
 				zap.Int("case", clus.cs),
 				zap.Error(err),
 			)
 			if err = clus.cleanup(); err != nil {
 				clus.lg.Warn(
-					"cleanup failed",
+					"cleanup FAIL",
 					zap.Int("round", clus.rd),
 					zap.Int("case", clus.cs),
 					zap.Error(err),
@@ -88,12 +88,6 @@ func (clus *Cluster) StartTester() {
 		}
 		if round > 0 && round%500 == 0 { // every 500 rounds
 			if err := clus.defrag(); err != nil {
-				clus.lg.Warn(
-					"defrag failed; returning",
-					zap.Int("round", clus.rd),
-					zap.Int("case", clus.cs),
-					zap.Error(err),
-				)
 				clus.failed()
 				return
 			}
@@ -101,7 +95,7 @@ func (clus *Cluster) StartTester() {
 	}
 
 	clus.lg.Info(
-		"functional-tester passed",
+		"functional-tester PASS",
 		zap.Int("round", clus.rd),
 		zap.Int("case", clus.cs),
 	)
@@ -112,18 +106,20 @@ func (clus *Cluster) doRound() error {
 		clus.shuffleFailures()
 	}
 
+	roundNow := time.Now()
 	clus.lg.Info(
-		"starting round",
+		"round START",
 		zap.Int("round", clus.rd),
 		zap.Strings("failures", clus.failureStrings()),
 	)
-
 	for i, fa := range clus.failures {
 		clus.cs = i
 
 		caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
+
+		caseNow := time.Now()
 		clus.lg.Info(
-			"failure case START",
+			"case START",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.String("desc", fa.Desc()),
@@ -138,7 +134,7 @@ func (clus *Cluster) doRound() error {
 		fcase := fa.FailureCase()
 		if fcase != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
 			clus.lg.Info(
-				"starting stressers before injecting failures",
+				"stresser START",
 				zap.Int("round", clus.rd),
 				zap.Int("case", clus.cs),
 				zap.String("desc", fa.Desc()),
@@ -150,7 +146,7 @@ func (clus *Cluster) doRound() error {
 		}
 
 		clus.lg.Info(
-			"injecting",
+			"inject START",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.String("desc", fa.Desc()),
@@ -163,7 +159,7 @@ func (clus *Cluster) doRound() error {
 		// with stressing client ports
 		// TODO: use unix for local tests
 		clus.lg.Info(
-			"recovering",
+			"recover START",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.String("desc", fa.Desc()),
@@ -173,13 +169,13 @@ func (clus *Cluster) doRound() error {
 		}
 
 		if stressStarted {
-			clus.lg.Info("pausing stresser after failure recovery, before wait health")
+			clus.lg.Info("stresser PAUSE")
 			ems := clus.stresser.Pause()
 			if fcase == rpcpb.FailureCase_NO_FAIL_WITH_STRESS && len(ems) > 0 {
 				ess := make([]string, 0, len(ems))
 				cnt := 0
 				for k, v := range ems {
-					ess = append(ess, fmt.Sprintf("%s (count %d)", k, v))
+					ess = append(ess, fmt.Sprintf("%s (count: %d)", k, v))
 					cnt += v
 				}
 				clus.lg.Warn(
@@ -187,34 +183,40 @@ func (clus *Cluster) doRound() error {
 					zap.String("desc", fa.Desc()),
 					zap.Strings("errors", ess),
 				)
-				return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
+
+				// with network delay, some ongoing requests may fail
+				// only return error, if more than 10% of QPS requests fail
+				if cnt > int(clus.Tester.StressQPS)/10 {
+					return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
+				}
 			}
 		}
 
-		clus.lg.Info("wait health after recover")
+		clus.lg.Info("health check START")
 		if err := clus.WaitHealth(); err != nil {
 			return fmt.Errorf("wait full health error: %v", err)
 		}
 
-		clus.lg.Info("check consistency after recover")
+		clus.lg.Info("consistency check START")
 		if err := clus.checkConsistency(); err != nil {
-			return fmt.Errorf("tt.checkConsistency error (%v)", err)
+			return fmt.Errorf("consistency check error (%v)", err)
 		}
 
 		clus.lg.Info(
-			"failure case PASS",
+			"case PASS",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.String("desc", fa.Desc()),
+			zap.Duration("took", time.Since(caseNow)),
 		)
 	}
 
 	clus.lg.Info(
-		"finished round",
+		"round ALL PASS",
 		zap.Int("round", clus.rd),
 		zap.Strings("failures", clus.failureStrings()),
+		zap.Duration("took", time.Since(roundNow)),
 	)
-
 	return nil
 }
 
@@ -233,28 +235,24 @@ func (clus *Cluster) updateRevision() error {
 }
 
 func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
-	clus.lg.Info(
-		"compacting storage",
-		zap.Int64("current-revision", clus.currentRevision),
-		zap.Int64("compact-revision", rev),
-	)
 	if err = clus.compactKV(rev, timeout); err != nil {
+		clus.lg.Warn(
+			"compact FAIL",
+			zap.Int64("current-revision", clus.currentRevision),
+			zap.Int64("compact-revision", rev),
+			zap.Error(err),
+		)
 		return err
 	}
 	clus.lg.Info(
-		"compacted storage",
+		"compact DONE",
 		zap.Int64("current-revision", clus.currentRevision),
 		zap.Int64("compact-revision", rev),
 	)
 
-	clus.lg.Info(
-		"checking compaction",
-		zap.Int64("current-revision", clus.currentRevision),
-		zap.Int64("compact-revision", rev),
-	)
 	if err = clus.checkCompact(rev); err != nil {
 		clus.lg.Warn(
-			"checkCompact failed",
+			"check compact FAIL",
 			zap.Int64("current-revision", clus.currentRevision),
 			zap.Int64("compact-revision", rev),
 			zap.Error(err),
@@ -262,7 +260,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
 		return err
 	}
 	clus.lg.Info(
-		"confirmed compaction",
+		"check compact DONE",
 		zap.Int64("current-revision", clus.currentRevision),
 		zap.Int64("compact-revision", rev),
 	)
@@ -276,7 +274,7 @@ func (clus *Cluster) failed() {
 	}
 
 	clus.lg.Info(
-		"exiting on failure",
+		"functional-tester FAIL",
 		zap.Int("round", clus.rd),
 		zap.Int("case", clus.cs),
 	)
@@ -303,7 +301,7 @@ func (clus *Cluster) cleanup() error {
 
 	if err := clus.FailArchive(); err != nil {
 		clus.lg.Warn(
-			"cleanup failed",
+			"cleanup FAIL",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.Error(err),
@@ -312,7 +310,7 @@ func (clus *Cluster) cleanup() error {
 	}
 	if err := clus.Restart(); err != nil {
 		clus.lg.Warn(
-			"restart failed",
+			"restart FAIL",
 			zap.Int("round", clus.rd),
 			zap.Int("case", clus.cs),
 			zap.Error(err),

+ 80 - 37
tools/functional-tester/tester/failure.go

@@ -20,6 +20,8 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
+
+	"go.uber.org/zap"
 )
 
 // Failure defines failure injection interface.
@@ -43,15 +45,15 @@ type injectMemberFunc func(*Cluster, int) error
 type recoverMemberFunc func(*Cluster, int) error
 
 type failureByFunc struct {
-	desc
+	desc          string
 	failureCase   rpcpb.FailureCase
 	injectMember  injectMemberFunc
 	recoverMember recoverMemberFunc
 }
 
 func (f *failureByFunc) Desc() string {
-	if string(f.desc) != "" {
-		return string(f.desc)
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }
@@ -100,8 +102,8 @@ func (f *failureFollower) Recover(clus *Cluster) error {
 }
 
 func (f *failureFollower) Desc() string {
-	if string(f.desc) != "" {
-		return string(f.desc)
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }
@@ -162,8 +164,8 @@ func (f *failureQuorum) Recover(clus *Cluster) error {
 }
 
 func (f *failureQuorum) Desc() string {
-	if string(f.desc) != "" {
-		return string(f.desc)
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }
@@ -172,6 +174,18 @@ func (f *failureQuorum) FailureCase() rpcpb.FailureCase {
 	return f.failureCase
 }
 
+func killMap(size int, seed int) map[int]bool {
+	m := make(map[int]bool)
+	r := rand.New(rand.NewSource(int64(seed)))
+	majority := size/2 + 1
+	for {
+		m[r.Intn(size)] = true
+		if len(m) >= majority {
+			return m
+		}
+	}
+}
+
 type failureAll failureByFunc
 
 func (f *failureAll) Inject(clus *Cluster) error {
@@ -193,8 +207,8 @@ func (f *failureAll) Recover(clus *Cluster) error {
 }
 
 func (f *failureAll) Desc() string {
-	if string(f.desc) != "" {
-		return string(f.desc)
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }
@@ -205,13 +219,15 @@ func (f *failureAll) FailureCase() rpcpb.FailureCase {
 
 // failureUntilSnapshot injects a failure and waits for a snapshot event
 type failureUntilSnapshot struct {
-	desc        desc
+	desc        string
 	failureCase rpcpb.FailureCase
-
 	Failure
 }
 
-const snapshotCount = 10000
+var slowCases = map[rpcpb.FailureCase]bool{
+	rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
+	rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT:       true,
+}
 
 func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
 	if err := f.Failure.Inject(clus); err != nil {
@@ -220,6 +236,18 @@ func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
 	if len(clus.Members) < 3 {
 		return nil
 	}
+
+	snapshotCount := clus.Members[0].Etcd.SnapshotCount
+
+	now := time.Now()
+	clus.lg.Info(
+		"trigger snapshot START",
+		zap.Int("round", clus.rd),
+		zap.Int("case", clus.cs),
+		zap.String("desc", f.Desc()),
+		zap.Int64("etcd-snapshot-count", snapshotCount),
+	)
+
 	// maxRev may fail since failure just injected, retry if failed.
 	startRev, err := clus.maxRev()
 	for i := 0; i < 10 && startRev == 0; i++ {
@@ -229,44 +257,59 @@ func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
 		return err
 	}
 	lastRev := startRev
-	// Normal healthy cluster could accept 1000req/s at least.
-	// Give it 3-times time to create a new snapshot.
-	retry := snapshotCount / 1000 * 3
-	for j := 0; j < retry; j++ {
+
+	// healthy cluster could accept 1000 req/sec at least.
+	// 3x time to trigger snapshot.
+	retries := int(snapshotCount) / 1000 * 3
+	if v, ok := slowCases[f.FailureCase()]; v && ok {
+		// slow network takes more retries
+		retries *= 4
+	}
+
+	for i := 0; i < retries; i++ {
 		lastRev, _ = clus.maxRev()
 		// If the number of proposals committed is bigger than snapshot count,
 		// a new snapshot should have been created.
-		if lastRev-startRev > snapshotCount {
+		diff := lastRev - startRev
+		if diff > snapshotCount {
+			clus.lg.Info(
+				"trigger snapshot PASS",
+				zap.Int("round", clus.rd),
+				zap.Int("case", clus.cs),
+				zap.Int("retries", i),
+				zap.String("desc", f.Desc()),
+				zap.Int64("committed-entries", diff),
+				zap.Int64("etcd-snapshot-count", snapshotCount),
+				zap.Int64("last-revision", lastRev),
+				zap.Duration("took", time.Since(now)),
+			)
 			return nil
 		}
+
+		clus.lg.Info(
+			"trigger snapshot PROGRESS",
+			zap.Int("retries", i),
+			zap.Int64("committed-entries", diff),
+			zap.Int64("etcd-snapshot-count", snapshotCount),
+			zap.Int64("last-revision", lastRev),
+			zap.Duration("took", time.Since(now)),
+		)
 		time.Sleep(time.Second)
 	}
-	return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
+
+	return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries)
 }
 
 func (f *failureUntilSnapshot) Desc() string {
-	if f.desc.Desc() != "" {
-		return f.desc.Desc()
+	if f.desc != "" {
+		return f.desc
 	}
-	return f.failureCase.String()
+	if f.failureCase.String() != "" {
+		return f.failureCase.String()
+	}
+	return f.Failure.Desc()
 }
 
 func (f *failureUntilSnapshot) FailureCase() rpcpb.FailureCase {
 	return f.failureCase
 }
-
-func killMap(size int, seed int) map[int]bool {
-	m := make(map[int]bool)
-	r := rand.New(rand.NewSource(int64(seed)))
-	majority := size/2 + 1
-	for {
-		m[r.Intn(size)] = true
-		if len(m) >= majority {
-			return m
-		}
-	}
-}
-
-type desc string
-
-func (d desc) Desc() string { return string(d) }

+ 5 - 5
tools/functional-tester/tester/failure_case_failpoints.go

@@ -51,7 +51,7 @@ func failpointFailures(clus *Cluster) (ret []Failure, err error) {
 			if strings.Contains(fp, "Snap") {
 				// hack to trigger snapshot failpoints
 				fpFails[i] = &failureUntilSnapshot{
-					desc:        desc(fpf.Desc()),
+					desc:        fpf.Desc(),
 					failureCase: rpcpb.FailureCase_FAILPOINTS,
 					Failure:     fpf,
 				}
@@ -95,7 +95,7 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure)
 		fs = append(fs, []Failure{
 			&failureFollower{
 				failureByFunc: failureByFunc{
-					desc:          desc(fmt.Sprintf("failpoint %q (one: %q)", fp, fcmd)),
+					desc:          fmt.Sprintf("failpoint %q (one: %q)", fp, fcmd),
 					failureCase:   rpcpb.FailureCase_FAILPOINTS,
 					injectMember:  inject,
 					recoverMember: recov,
@@ -105,7 +105,7 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure)
 			},
 			&failureLeader{
 				failureByFunc: failureByFunc{
-					desc:          desc(fmt.Sprintf("failpoint %q (leader: %q)", fp, fcmd)),
+					desc:          fmt.Sprintf("failpoint %q (leader: %q)", fp, fcmd),
 					failureCase:   rpcpb.FailureCase_FAILPOINTS,
 					injectMember:  inject,
 					recoverMember: recov,
@@ -114,13 +114,13 @@ func failuresFromFailpoint(fp string, failpointCommands []string) (fs []Failure)
 				lead: -1,
 			},
 			&failureQuorum{
-				desc:          desc(fmt.Sprintf("failpoint %q (quorum: %q)", fp, fcmd)),
+				desc:          fmt.Sprintf("failpoint %q (quorum: %q)", fp, fcmd),
 				failureCase:   rpcpb.FailureCase_FAILPOINTS,
 				injectMember:  inject,
 				recoverMember: recov,
 			},
 			&failureAll{
-				desc:          desc(fmt.Sprintf("failpoint %q (all: %q)", fp, fcmd)),
+				desc:          fmt.Sprintf("failpoint %q (all: %q)", fp, fcmd),
 				failureCase:   rpcpb.FailureCase_FAILPOINTS,
 				injectMember:  inject,
 				recoverMember: recov,

+ 4 - 4
tools/functional-tester/tester/failure_case_kill.go

@@ -58,16 +58,16 @@ func newFailureKillAll() Failure {
 	}
 }
 
-func newFailureKillOneFollowerForLongTime() Failure {
+func newFailureKillOneFollowerUntilTriggerSnapshot() Failure {
 	return &failureUntilSnapshot{
-		failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER_FOR_LONG,
+		failureCase: rpcpb.FailureCase_KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
 		Failure:     newFailureKillOneFollower(),
 	}
 }
 
-func newFailureKillLeaderForLongTime() Failure {
+func newFailureKillLeaderUntilTriggerSnapshot() Failure {
 	return &failureUntilSnapshot{
-		failureCase: rpcpb.FailureCase_KILL_LEADER_FOR_LONG,
+		failureCase: rpcpb.FailureCase_KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT,
 		Failure:     newFailureKillLeader(),
 	}
 }

+ 39 - 3
tools/functional-tester/tester/failure_case_network_blackhole.go

@@ -14,9 +14,7 @@
 
 package tester
 
-import (
-	"github.com/coreos/etcd/tools/functional-tester/rpcpb"
-)
+import "github.com/coreos/etcd/tools/functional-tester/rpcpb"
 
 func injectBlackholePeerPortTxRx(clus *Cluster, idx int) error {
 	return clus.sendOperation(idx, rpcpb.Operation_BlackholePeerPortTxRx)
@@ -39,6 +37,19 @@ func newFailureBlackholePeerPortTxRxOneFollower(clus *Cluster) Failure {
 	}
 }
 
+func newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure {
+	ff := failureByFunc{
+		failureCase:   rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
+		injectMember:  injectBlackholePeerPortTxRx,
+		recoverMember: recoverBlackholePeerPortTxRx,
+	}
+	f := &failureFollower{ff, -1, -1}
+	return &failureUntilSnapshot{
+		failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
+		Failure:     f,
+	}
+}
+
 func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure {
 	ff := failureByFunc{
 		failureCase:   rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER,
@@ -52,6 +63,31 @@ func newFailureBlackholePeerPortTxRxLeader(clus *Cluster) Failure {
 	}
 }
 
+func newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot() Failure {
+	ff := failureByFunc{
+		failureCase:   rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
+		injectMember:  injectBlackholePeerPortTxRx,
+		recoverMember: recoverBlackholePeerPortTxRx,
+	}
+	f := &failureLeader{ff, -1, -1}
+	return &failureUntilSnapshot{
+		failureCase: rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
+		Failure:     f,
+	}
+}
+
+func newFailureBlackholePeerPortTxRxQuorum(clus *Cluster) Failure {
+	f := &failureQuorum{
+		failureCase:   rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM,
+		injectMember:  injectBlackholePeerPortTxRx,
+		recoverMember: recoverBlackholePeerPortTxRx,
+	}
+	return &failureDelay{
+		Failure:       f,
+		delayDuration: clus.GetFailureDelayDuration(),
+	}
+}
+
 func newFailureBlackholePeerPortTxRxAll(clus *Cluster) Failure {
 	f := &failureAll{
 		failureCase:   rpcpb.FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL,

+ 38 - 0
tools/functional-tester/tester/failure_case_network_slow.go → tools/functional-tester/tester/failure_case_network_delay.go

@@ -51,6 +51,19 @@ func newFailureDelayPeerPortTxRxOneFollower(clus *Cluster) Failure {
 	}
 }
 
+func newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot() Failure {
+	ff := failureByFunc{
+		failureCase:   rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
+		injectMember:  injectDelayPeerPortTxRx,
+		recoverMember: recoverDelayPeerPortTxRx,
+	}
+	f := &failureFollower{ff, -1, -1}
+	return &failureUntilSnapshot{
+		failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
+		Failure:     f,
+	}
+}
+
 func newFailureDelayPeerPortTxRxLeader(clus *Cluster) Failure {
 	ff := failureByFunc{
 		failureCase:   rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER,
@@ -64,6 +77,31 @@ func newFailureDelayPeerPortTxRxLeader(clus *Cluster) Failure {
 	}
 }
 
+func newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot() Failure {
+	ff := failureByFunc{
+		failureCase:   rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
+		injectMember:  injectDelayPeerPortTxRx,
+		recoverMember: recoverDelayPeerPortTxRx,
+	}
+	f := &failureLeader{ff, -1, -1}
+	return &failureUntilSnapshot{
+		failureCase: rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
+		Failure:     f,
+	}
+}
+
+func newFailureDelayPeerPortTxRxQuorum(clus *Cluster) Failure {
+	f := &failureQuorum{
+		failureCase:   rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM,
+		injectMember:  injectDelayPeerPortTxRx,
+		recoverMember: recoverDelayPeerPortTxRx,
+	}
+	return &failureDelay{
+		Failure:       f,
+		delayDuration: clus.GetFailureDelayDuration(),
+	}
+}
+
 func newFailureDelayPeerPortTxRxAll(clus *Cluster) Failure {
 	f := &failureAll{
 		failureCase:   rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ALL,

+ 4 - 4
tools/functional-tester/tester/failure_case_no_fail.go

@@ -33,8 +33,8 @@ func (f *failureNoFailWithStress) Recover(clus *Cluster) error {
 }
 
 func (f *failureNoFailWithStress) Desc() string {
-	if f.desc.Desc() != "" {
-		return f.desc.Desc()
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }
@@ -78,8 +78,8 @@ func (f *failureNoFailWithNoStressForLiveness) Recover(clus *Cluster) error {
 }
 
 func (f *failureNoFailWithNoStressForLiveness) Desc() string {
-	if f.desc.Desc() != "" {
-		return f.desc.Desc()
+	if f.desc != "" {
+		return f.desc
 	}
 	return f.failureCase.String()
 }

+ 12 - 5
tools/functional-tester/tester/local-test.yaml

@@ -76,8 +76,9 @@ tester-config:
   tester-network: tcp
   tester-addr: 127.0.0.1:9028
 
-  delay-latency-ms: 500
-  delay-latency-ms-rv: 50
+  # slow enough to trigger election
+  delay-latency-ms: 5000
+  delay-latency-ms-rv: 150
 
   round-limit: 1
   exit-on-failure: true
@@ -86,16 +87,22 @@ tester-config:
 
   failure-cases:
   - KILL_ONE_FOLLOWER
+  - KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
   - KILL_LEADER
-  - KILL_ONE_FOLLOWER_FOR_LONG
-  - KILL_LEADER_FOR_LONG
+  - KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT
   - KILL_QUORUM
   - KILL_ALL
   - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
+  - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
   - BLACKHOLE_PEER_PORT_TX_RX_LEADER
+  - BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
+  - BLACKHOLE_PEER_PORT_TX_RX_QUORUM
   - BLACKHOLE_PEER_PORT_TX_RX_ALL
   - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
+  - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
   - DELAY_PEER_PORT_TX_RX_LEADER
+  - DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
+  - DELAY_PEER_PORT_TX_RX_QUORUM
   - DELAY_PEER_PORT_TX_RX_ALL
   - NO_FAIL_WITH_STRESS
   - NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
@@ -125,4 +132,4 @@ tester-config:
   stress-key-txn-ops: 10
 
   stress-clients: 100
-  stress-qps: 1000
+  stress-qps: 2000

+ 5 - 1
tools/functional-tester/tester/stress.go

@@ -41,7 +41,11 @@ type Stresser interface {
 func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
 	stressers := make([]Stresser, len(clus.Tester.StressTypes))
 	for i, stype := range clus.Tester.StressTypes {
-		clus.lg.Info("creating stresser", zap.String("type", stype))
+		clus.lg.Info(
+			"creating stresser",
+			zap.String("type", stype),
+			zap.String("endpoint", m.EtcdClientEndpoint),
+		)
 
 		switch stype {
 		case "KV":

+ 6 - 6
tools/functional-tester/tester/stress_key.go

@@ -102,7 +102,7 @@ func (s *keyStresser) Stress() error {
 	}
 
 	s.lg.Info(
-		"key stresser started in background",
+		"key stresser START",
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	return nil
@@ -181,16 +181,16 @@ func (s *keyStresser) Close() map[string]int {
 	s.cli.Close()
 	s.wg.Wait()
 
-	s.lg.Info(
-		"key stresser is closed",
-		zap.String("endpoint", s.m.EtcdClientEndpoint),
-	)
-
 	s.emu.Lock()
 	s.paused = true
 	ess := s.ems
 	s.ems = make(map[string]int, 100)
 	s.emu.Unlock()
+
+	s.lg.Info(
+		"key stresser STOP",
+		zap.String("endpoint", s.m.EtcdClientEndpoint),
+	)
 	return ess
 }
 

+ 2 - 6
tools/functional-tester/tester/stress_lease.go

@@ -121,7 +121,7 @@ func (ls *leaseStresser) setupOnce() error {
 
 func (ls *leaseStresser) Stress() error {
 	ls.lg.Info(
-		"lease stresser is started",
+		"lease stresser START",
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
 	)
 
@@ -452,16 +452,12 @@ func (ls *leaseStresser) Pause() map[string]int {
 }
 
 func (ls *leaseStresser) Close() map[string]int {
-	ls.lg.Info(
-		"lease stresser is closing",
-		zap.String("endpoint", ls.m.EtcdClientEndpoint),
-	)
 	ls.cancel()
 	ls.runWg.Wait()
 	ls.aliveWg.Wait()
 	ls.cli.Close()
 	ls.lg.Info(
-		"lease stresser is closed",
+		"lease stresser STOP",
 		zap.String("endpoint", ls.m.EtcdClientEndpoint),
 	)
 	return nil