Browse Source

functional: update

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
Gyuho Lee 6 years ago
parent
commit
6e37ece3b9

+ 50 - 15
functional.yaml

@@ -1,9 +1,8 @@
 agent-configs:
-- etcd-exec-path: ./bin/etcd
+- etcd-exec: ./bin/etcd
   agent-addr: 127.0.0.1:19027
   failpoint-http-addr: http://127.0.0.1:7381
   base-dir: /tmp/etcd-functional-1
-  etcd-log-path: /tmp/etcd-functional-1/etcd.log
   etcd-client-proxy: false
   etcd-peer-proxy: true
   etcd-client-endpoint: 127.0.0.1:1379
@@ -30,7 +29,7 @@ agent-configs:
     initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
     initial-cluster-state: new
     initial-cluster-token: tkn
-    snapshot-count: 10000
+    snapshot-count: 2000
     quota-backend-bytes: 10740000000 # 10 GiB
     pre-vote: true
     initial-corrupt-check: true
@@ -48,11 +47,10 @@ agent-configs:
   peer-trusted-ca-path: ""
   snapshot-path: /tmp/etcd-functional-1.snapshot.db
 
-- etcd-exec-path: ./bin/etcd
+- etcd-exec: ./bin/etcd
   agent-addr: 127.0.0.1:29027
   failpoint-http-addr: http://127.0.0.1:7382
   base-dir: /tmp/etcd-functional-2
-  etcd-log-path: /tmp/etcd-functional-2/etcd.log
   etcd-client-proxy: false
   etcd-peer-proxy: true
   etcd-client-endpoint: 127.0.0.1:2379
@@ -79,7 +77,7 @@ agent-configs:
     initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
     initial-cluster-state: new
     initial-cluster-token: tkn
-    snapshot-count: 10000
+    snapshot-count: 2000
     quota-backend-bytes: 10740000000 # 10 GiB
     pre-vote: true
     initial-corrupt-check: true
@@ -97,11 +95,10 @@ agent-configs:
   peer-trusted-ca-path: ""
   snapshot-path: /tmp/etcd-functional-2.snapshot.db
 
-- etcd-exec-path: ./bin/etcd
+- etcd-exec: ./bin/etcd
   agent-addr: 127.0.0.1:39027
   failpoint-http-addr: http://127.0.0.1:7383
   base-dir: /tmp/etcd-functional-3
-  etcd-log-path: /tmp/etcd-functional-3/etcd.log
   etcd-client-proxy: false
   etcd-peer-proxy: true
   etcd-client-endpoint: 127.0.0.1:3379
@@ -128,7 +125,7 @@ agent-configs:
     initial-cluster: s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381
     initial-cluster-state: new
     initial-cluster-token: tkn
-    snapshot-count: 10000
+    snapshot-count: 2000
     quota-backend-bytes: 10740000000 # 10 GiB
     pre-vote: true
     initial-corrupt-check: true
@@ -163,7 +160,7 @@ tester-config:
   case-shuffle: true
 
   # For full descriptions,
-  # https://godoc.org/github.com/coreos/etcd/functional/rpcpb#Case
+  # https://godoc.org/github.com/etcd-io/etcd/functional/rpcpb#Case
   cases:
   - SIGTERM_ONE_FOLLOWER
   - SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
@@ -173,24 +170,62 @@ tester-config:
   - SIGTERM_ALL
   - SIGQUIT_AND_REMOVE_ONE_FOLLOWER
   - SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
-  - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
-  - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
   - BLACKHOLE_PEER_PORT_TX_RX_LEADER
   - BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
   - BLACKHOLE_PEER_PORT_TX_RX_QUORUM
-  - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
+  - BLACKHOLE_PEER_PORT_TX_RX_ALL
   - DELAY_PEER_PORT_TX_RX_LEADER
+  - RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
+  - DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
+  - RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
   - DELAY_PEER_PORT_TX_RX_QUORUM
+  - RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
+  - DELAY_PEER_PORT_TX_RX_ALL
+  - RANDOM_DELAY_PEER_PORT_TX_RX_ALL
+  - NO_FAIL_WITH_STRESS
+  - NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
+
+  # TODO: use iptables for discarding outbound rafthttp traffic to peer port
+  # - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER
+  # - BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
+  # - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
+  # - RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
+  # - DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
+  # - RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
+  # - SIGQUIT_AND_REMOVE_LEADER
+  # - SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT
+  # - SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH
 
   failpoint-commands:
   - panic("etcd-tester")
+  # - panic("etcd-tester"),1*sleep(1000)
 
   runner-exec-path: ./bin/etcd-runner
   external-exec-path: ""
 
+  # make up ±70% of workloads with writes
   stressers:
-  - KV
-  - LEASE
+  - type: KV_WRITE_SMALL
+    weight: 0.35
+  - type: KV_WRITE_LARGE
+    weight: 0.002
+  - type: KV_READ_ONE_KEY
+    weight: 0.07
+  - type: KV_READ_RANGE
+    weight: 0.07
+  - type: KV_DELETE_ONE_KEY
+    weight: 0.07
+  - type: KV_DELETE_RANGE
+    weight: 0.07
+  - type: KV_TXN_WRITE_DELETE
+    weight: 0.35
+  - type: LEASE
+    weight: 0.0
+
+  # - ELECTION_RUNNER
+  # - WATCH_RUNNER
+  # - LOCK_RACER_RUNNER
+  # - LEASE_RUNNER
 
   checkers:
   - KV_HASH

+ 1 - 1
functional/Dockerfile

@@ -39,4 +39,4 @@ RUN go get -v github.com/coreos/gofail \
   && cp ./bin/etcd-tester /bin/etcd-tester \
   && go build -v -o /bin/benchmark ./tools/benchmark \
   && popd \
-  && rm -rf ${GOPATH}/src/github.com/coreos/etcd
+  && rm -rf ${GOPATH}/src/github.com/coreos/etcd

+ 6 - 6
functional/README.md

@@ -4,7 +4,7 @@
 
 See [`rpcpb.Case`](https://godoc.org/github.com/coreos/etcd/functional/rpcpb#Case) for all failure cases.
 
-See [functional.yaml](https://github.com/coreos/etcd/blob/master/functional.yaml) for an example configuration.
+See [functional.yaml](https://github.com/etcd-io/etcd/blob/master/functional.yaml) for an example configuration.
 
 ### Run locally
 
@@ -16,7 +16,7 @@ PASSES=functional ./test
 
 ```bash
 pushd ..
-make build-docker-functional
+make build-docker-functional push-docker-functional pull-docker-functional
 popd
 ```
 
@@ -24,12 +24,12 @@ And run [example scripts](./scripts).
 
 ```bash
 # run 3 agents for 3-node local etcd cluster
-./scripts/docker-local-agent.sh 1
-./scripts/docker-local-agent.sh 2
-./scripts/docker-local-agent.sh 3
+./functional/scripts/docker-local-agent.sh 1
+./functional/scripts/docker-local-agent.sh 2
+./functional/scripts/docker-local-agent.sh 3
 
 # to run only 1 tester round
-./scripts/docker-local-tester.sh
+./functional/scripts/docker-local-tester.sh
 ```
 
 ## etcd Proxy

+ 199 - 130
functional/agent/handler.go

@@ -70,13 +70,13 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
 		return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
 
 	case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
-		return srv.handle_BLACKHOLE_PEER_PORT_TX_RX()
+		return srv.handle_BLACKHOLE_PEER_PORT_TX_RX(), nil
 	case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX:
-		return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX()
+		return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX(), nil
 	case rpcpb.Operation_DELAY_PEER_PORT_TX_RX:
-		return srv.handle_DELAY_PEER_PORT_TX_RX()
+		return srv.handle_DELAY_PEER_PORT_TX_RX(), nil
 	case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX:
-		return srv.handle_UNDELAY_PEER_PORT_TX_RX()
+		return srv.handle_UNDELAY_PEER_PORT_TX_RX(), nil
 
 	default:
 		msg := fmt.Sprintf("operation not found (%v)", req.Operation)
@@ -84,50 +84,125 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
 	}
 }
 
-func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
-	if srv.last != rpcpb.Operation_NOT_STARTED {
-		return &rpcpb.Response{
-			Success: false,
-			Status:  fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
-			Member:  req.Member,
-		}, nil
+// just archive the first file
+func (srv *Server) createEtcdLogFile() error {
+	var err error
+	srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutputs[0])
+	if err != nil {
+		return err
 	}
+	srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutputs[0]))
+	return nil
+}
 
-	err := fileutil.TouchDirAll(srv.Member.BaseDir)
-	if err != nil {
-		return nil, err
+func (srv *Server) creatEtcd(fromSnapshot bool) error {
+	if !fileutil.Exist(srv.Member.EtcdExec) {
+		return fmt.Errorf("unknown etcd exec path %q does not exist", srv.Member.EtcdExec)
 	}
-	srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
 
-	if err = srv.createEtcdLogFile(); err != nil {
-		return nil, err
+	etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
+	if fromSnapshot {
+		etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
 	}
+	u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
+	srv.lg.Info(
+		"creating etcd command",
+		zap.String("etcd-exec", etcdPath),
+		zap.Strings("etcd-flags", etcdFlags),
+		zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
+		zap.String("failpoint-addr", u.Host),
+	)
+	srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
+	srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
+	srv.etcdCmd.Stdout = srv.etcdLogFile
+	srv.etcdCmd.Stderr = srv.etcdLogFile
+	return nil
+}
 
-	srv.creatEtcdCmd(false)
+// start but do not wait for it to complete
+func (srv *Server) runEtcd() error {
+	errc := make(chan error)
+	go func() {
+		time.Sleep(5 * time.Second)
+		// server advertise client/peer listener had to start first
+		// before setting up proxy listener
+		errc <- srv.startProxy()
+	}()
 
-	if err = srv.saveTLSAssets(); err != nil {
-		return nil, err
-	}
-	if err = srv.startEtcdCmd(); err != nil {
-		return nil, err
+	if srv.etcdCmd != nil {
+		srv.lg.Info(
+			"starting etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+		)
+		err := srv.etcdCmd.Start()
+		perr := <-errc
+		srv.lg.Info(
+			"started etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.Errors("errors", []error{err, perr}),
+		)
+		if err != nil {
+			return err
+		}
+		return perr
 	}
-	srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))
-	if err = srv.loadAutoTLSAssets(); err != nil {
-		return nil, err
+
+	select {
+	case <-srv.etcdServer.Server.ReadyNotify():
+		srv.lg.Info("embedded etcd is ready")
+	case <-time.After(time.Minute):
+		srv.etcdServer.Close()
+		return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
 	}
+	return <-errc
+}
 
-	// wait some time for etcd listener start
-	// before setting up proxy
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
-		return nil, err
+// SIGQUIT to exit with stackstrace
+func (srv *Server) stopEtcd(sig os.Signal) error {
+	srv.stopProxy()
+
+	if srv.etcdCmd != nil {
+		srv.lg.Info(
+			"stopping etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.String("signal", sig.String()),
+		)
+
+		err := srv.etcdCmd.Process.Signal(sig)
+		if err != nil {
+			return err
+		}
+
+		errc := make(chan error)
+		go func() {
+			_, ew := srv.etcdCmd.Process.Wait()
+			errc <- ew
+			close(errc)
+		}()
+
+		select {
+		case <-time.After(5 * time.Second):
+			srv.etcdCmd.Process.Kill()
+		case e := <-errc:
+			return e
+		}
+
+		err = <-errc
+
+		srv.lg.Info(
+			"stopped etcd command",
+			zap.String("command-path", srv.etcdCmd.Path),
+			zap.String("signal", sig.String()),
+			zap.Error(err),
+		)
+		return err
 	}
 
-	return &rpcpb.Response{
-		Success: true,
-		Status:  "start etcd PASS",
-		Member:  srv.Member,
-	}, nil
+	srv.lg.Info("stopping embedded etcd")
+	srv.etcdServer.Server.HardStop()
+	srv.etcdServer.Close()
+	srv.lg.Info("stopped embedded etcd")
+	return nil
 }
 
 func (srv *Server) startProxy() error {
@@ -141,6 +216,7 @@ func (srv *Server) startProxy() error {
 			return err
 		}
 
+		srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
 		srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
 			Logger: srv.lg,
 			From:   *advertiseClientURL,
@@ -164,6 +240,7 @@ func (srv *Server) startProxy() error {
 			return err
 		}
 
+		srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
 		srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
 			Logger: srv.lg,
 			From:   *advertisePeerURL,
@@ -222,34 +299,6 @@ func (srv *Server) stopProxy() {
 	}
 }
 
-func (srv *Server) createEtcdLogFile() error {
-	var err error
-	srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath)
-	if err != nil {
-		return err
-	}
-	srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath))
-	return nil
-}
-
-func (srv *Server) creatEtcdCmd(fromSnapshot bool) {
-	etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags()
-	if fromSnapshot {
-		etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
-	}
-	u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
-	srv.lg.Info("creating etcd command",
-		zap.String("etcd-exec-path", etcdPath),
-		zap.Strings("etcd-flags", etcdFlags),
-		zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
-		zap.String("failpoint-addr", u.Host),
-	)
-	srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
-	srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
-	srv.etcdCmd.Stdout = srv.etcdLogFile
-	srv.etcdCmd.Stderr = srv.etcdLogFile
-}
-
 // if started with manual TLS, stores TLS assets
 // from tester/client to disk before starting etcd process
 func (srv *Server) saveTLSAssets() error {
@@ -322,7 +371,6 @@ func (srv *Server) saveTLSAssets() error {
 			zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath),
 		)
 	}
-
 	return nil
 }
 
@@ -412,9 +460,45 @@ func (srv *Server) loadAutoTLSAssets() error {
 	return nil
 }
 
-// start but do not wait for it to complete
-func (srv *Server) startEtcdCmd() error {
-	return srv.etcdCmd.Start()
+func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
+	if srv.last != rpcpb.Operation_NOT_STARTED {
+		return &rpcpb.Response{
+			Success: false,
+			Status:  fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
+			Member:  req.Member,
+		}, nil
+	}
+
+	err := fileutil.TouchDirAll(srv.Member.BaseDir)
+	if err != nil {
+		return nil, err
+	}
+	srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
+
+	if srv.etcdServer == nil {
+		if err = srv.createEtcdLogFile(); err != nil {
+			return nil, err
+		}
+	}
+
+	if err = srv.saveTLSAssets(); err != nil {
+		return nil, err
+	}
+	if err = srv.creatEtcd(false); err != nil {
+		return nil, err
+	}
+	if err = srv.runEtcd(); err != nil {
+		return nil, err
+	}
+	if err = srv.loadAutoTLSAssets(); err != nil {
+		return nil, err
+	}
+
+	return &rpcpb.Response{
+		Success: true,
+		Status:  "start etcd PASS",
+		Member:  srv.Member,
+	}, nil
 }
 
 func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
@@ -426,25 +510,16 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
 		}
 	}
 
-	srv.creatEtcdCmd(false)
-
 	if err = srv.saveTLSAssets(); err != nil {
 		return nil, err
 	}
-	if err = srv.startEtcdCmd(); err != nil {
+	if err = srv.creatEtcd(false); err != nil {
 		return nil, err
 	}
-	srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
-	if err = srv.loadAutoTLSAssets(); err != nil {
+	if err = srv.runEtcd(); err != nil {
 		return nil, err
 	}
-
-	// wait some time for etcd listener start
-	// before setting up proxy
-	// TODO: local tests should handle port conflicts
-	// with clients on restart
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
+	if err = srv.loadAutoTLSAssets(); err != nil {
 		return nil, err
 	}
 
@@ -456,13 +531,15 @@ func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
 }
 
 func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
-	if err != nil {
+	if err := srv.stopEtcd(syscall.SIGTERM); err != nil {
 		return nil, err
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
+
+	if srv.etcdServer != nil {
+		// srv.etcdServer.GetLogger().Sync()
+	} else {
+		srv.etcdLogFile.Sync()
+	}
 
 	return &rpcpb.Response{
 		Success: true,
@@ -471,16 +548,17 @@ func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
 }
 
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 		return nil, err
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
-	srv.etcdLogFile.Sync()
-	srv.etcdLogFile.Close()
+	if srv.etcdServer != nil {
+		// srv.etcdServer.GetLogger().Sync()
+	} else {
+		srv.etcdLogFile.Sync()
+		srv.etcdLogFile.Close()
+	}
 
 	// for debugging purposes, rename instead of removing
 	if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil {
@@ -502,9 +580,6 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error
 			return nil, err
 		}
 	}
-	if err = srv.createEtcdLogFile(); err != nil {
-		return nil, err
-	}
 
 	return &rpcpb.Response{
 		Success: true,
@@ -537,25 +612,16 @@ func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response,
 }
 
 func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
-	srv.creatEtcdCmd(true)
-
 	if err = srv.saveTLSAssets(); err != nil {
 		return nil, err
 	}
-	if err = srv.startEtcdCmd(); err != nil {
+	if err = srv.creatEtcd(true); err != nil {
 		return nil, err
 	}
-	srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
-	if err = srv.loadAutoTLSAssets(); err != nil {
+	if err = srv.runEtcd(); err != nil {
 		return nil, err
 	}
-
-	// wait some time for etcd listener start
-	// before setting up proxy
-	// TODO: local tests should handle port conflicts
-	// with clients on restart
-	time.Sleep(time.Second)
-	if err = srv.startProxy(); err != nil {
+	if err = srv.loadAutoTLSAssets(); err != nil {
 		return nil, err
 	}
 
@@ -567,30 +633,32 @@ func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err err
 }
 
 func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	// exit with stackstrace
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 		return nil, err
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
-	srv.etcdLogFile.Sync()
-	srv.etcdLogFile.Close()
+	if srv.etcdServer != nil {
+		// srv.etcdServer.GetLogger().Sync()
+	} else {
+		srv.etcdLogFile.Sync()
+		srv.etcdLogFile.Close()
+	}
 
 	// TODO: support separate WAL directory
 	if err = archive(
 		srv.Member.BaseDir,
-		srv.Member.EtcdLogPath,
+		srv.Member.Etcd.LogOutputs[0],
 		srv.Member.Etcd.DataDir,
 	); err != nil {
 		return nil, err
 	}
 	srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
 
-	if err = srv.createEtcdLogFile(); err != nil {
-		return nil, err
+	if srv.etcdServer == nil {
+		if err = srv.createEtcdLogFile(); err != nil {
+			return nil, err
+		}
 	}
 
 	srv.lg.Info("cleaning up page cache")
@@ -607,16 +675,17 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, erro
 
 // stop proxy, etcd, delete data directory
 func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
-	srv.stopProxy()
-
-	err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
+	err := srv.stopEtcd(syscall.SIGQUIT)
 	if err != nil {
 		return nil, err
 	}
-	srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
 
-	srv.etcdLogFile.Sync()
-	srv.etcdLogFile.Close()
+	if srv.etcdServer != nil {
+		// srv.etcdServer.GetLogger().Sync()
+	} else {
+		srv.etcdLogFile.Sync()
+		srv.etcdLogFile.Close()
+	}
 
 	err = os.RemoveAll(srv.Member.BaseDir)
 	if err != nil {
@@ -633,7 +702,7 @@ func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.
 	}, nil
 }
 
-func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
+func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
 	for port, px := range srv.advertisePeerPortToProxy {
 		srv.lg.Info("blackholing", zap.Int("peer-port", port))
 		px.BlackholeTx()
@@ -643,10 +712,10 @@ func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
 	return &rpcpb.Response{
 		Success: true,
 		Status:  "blackholed peer port tx/rx",
-	}, nil
+	}
 }
 
-func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
+func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() *rpcpb.Response {
 	for port, px := range srv.advertisePeerPortToProxy {
 		srv.lg.Info("unblackholing", zap.Int("peer-port", port))
 		px.UnblackholeTx()
@@ -656,10 +725,10 @@ func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error)
 	return &rpcpb.Response{
 		Success: true,
 		Status:  "unblackholed peer port tx/rx",
-	}, nil
+	}
 }
 
-func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
+func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() *rpcpb.Response {
 	lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond
 	rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
 
@@ -681,10 +750,10 @@ func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
 	return &rpcpb.Response{
 		Success: true,
 		Status:  "delayed peer port tx/rx",
-	}, nil
+	}
 }
 
-func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
+func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() *rpcpb.Response {
 	for port, px := range srv.advertisePeerPortToProxy {
 		srv.lg.Info("undelaying", zap.Int("peer-port", port))
 		px.UndelayTx()
@@ -694,5 +763,5 @@ func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
 	return &rpcpb.Response{
 		Success: true,
 		Status:  "undelayed peer port tx/rx",
-	}, nil
+	}
 }

+ 14 - 10
functional/agent/server.go

@@ -21,6 +21,7 @@ import (
 	"os/exec"
 	"strings"
 
+	"github.com/coreos/etcd/embed"
 	"github.com/coreos/etcd/functional/rpcpb"
 	"github.com/coreos/etcd/pkg/proxy"
 
@@ -33,8 +34,9 @@ import (
 // no need to lock fields since request operations are
 // serialized in tester-side
 type Server struct {
+	lg *zap.Logger
+
 	grpcServer *grpc.Server
-	lg         *zap.Logger
 
 	network string
 	address string
@@ -46,6 +48,7 @@ type Server struct {
 	*rpcpb.Member
 	*rpcpb.Tester
 
+	etcdServer  *embed.Etcd
 	etcdCmd     *exec.Cmd
 	etcdLogFile *os.File
 
@@ -61,10 +64,10 @@ func NewServer(
 	address string,
 ) *Server {
 	return &Server{
-		lg:      lg,
-		network: network,
-		address: address,
-		last:    rpcpb.Operation_NOT_STARTED,
+		lg:                         lg,
+		network:                    network,
+		address:                    address,
+		last:                       rpcpb.Operation_NOT_STARTED,
 		advertiseClientPortToProxy: make(map[int]proxy.Server),
 		advertisePeerPortToProxy:   make(map[int]proxy.Server),
 	}
@@ -123,11 +126,12 @@ func (srv *Server) Stop() {
 }
 
 // Transport communicates with etcd tester.
-func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (err error) {
-	errc := make(chan error)
+func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (reterr error) {
+	errc := make(chan error, 1)
 	go func() {
 		for {
 			var req *rpcpb.Request
+			var err error
 			req, err = stream.Recv()
 			if err != nil {
 				errc <- err
@@ -158,9 +162,9 @@ func (srv *Server) Transport(stream rpcpb.Transport_TransportServer) (err error)
 	}()
 
 	select {
-	case err = <-errc:
+	case reterr = <-errc:
 	case <-stream.Context().Done():
-		err = stream.Context().Err()
+		reterr = stream.Context().Err()
 	}
-	return err
+	return reterr
 }

+ 14 - 16
functional/agent/utils.go

@@ -15,6 +15,7 @@
 package agent
 
 import (
+	"io"
 	"net"
 	"net/url"
 	"os"
@@ -36,7 +37,8 @@ func archive(baseDir, etcdLogPath, dataDir string) error {
 		return err
 	}
 
-	if err := os.Rename(etcdLogPath, filepath.Join(dir, "etcd.log")); err != nil {
+	dst := filepath.Join(dir, "etcd.log")
+	if err := copyFile(etcdLogPath, dst); err != nil {
 		if !os.IsNotExist(err) {
 			return err
 		}
@@ -79,27 +81,23 @@ func getURLAndPort(addr string) (urlAddr *url.URL, port int, err error) {
 	return urlAddr, port, err
 }
 
-func stopWithSig(cmd *exec.Cmd, sig os.Signal) error {
-	err := cmd.Process.Signal(sig)
+func copyFile(src, dst string) error {
+	f, err := os.Open(src)
 	if err != nil {
 		return err
 	}
+	defer f.Close()
 
-	errc := make(chan error)
-	go func() {
-		_, ew := cmd.Process.Wait()
-		errc <- ew
-		close(errc)
-	}()
+	w, err := os.Create(dst)
+	if err != nil {
+		return err
+	}
+	defer w.Close()
 
-	select {
-	case <-time.After(5 * time.Second):
-		cmd.Process.Kill()
-	case e := <-errc:
-		return e
+	if _, err = io.Copy(w, f); err != nil {
+		return err
 	}
-	err = <-errc
-	return err
+	return w.Sync()
 }
 
 func cleanPageCache() error {

+ 4 - 4
functional/build

@@ -5,7 +5,7 @@ if ! [[ "$0" =~ "functional/build" ]]; then
   exit 255
 fi
 
-CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-agent ./cmd/functional/cmd/etcd-agent
-CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-proxy ./cmd/functional/cmd/etcd-proxy
-CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-runner ./cmd/functional/cmd/etcd-runner
-CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-tester ./cmd/functional/cmd/etcd-tester
+CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-agent ./functional/cmd/etcd-agent
+CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-proxy ./functional/cmd/etcd-proxy
+CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-runner ./functional/cmd/etcd-runner
+CGO_ENABLED=0 go build -v -installsuffix cgo -ldflags "-s" -o ./bin/etcd-tester ./functional/cmd/etcd-tester

+ 7 - 4
functional/cmd/etcd-proxy/main.go

@@ -19,6 +19,8 @@ import (
 	"context"
 	"flag"
 	"fmt"
+	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"os"
@@ -62,8 +64,8 @@ $ make build-etcd-proxy
 $ ./bin/etcd-proxy --help
 $ ./bin/etcd-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose
 
-$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar
-$ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
+$ ./bin/etcdctl --endpoints localhost:2379 put foo bar
+$ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
 		flag.PrintDefaults()
 	}
 
@@ -191,8 +193,9 @@ $ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
 		}
 	})
 	srv := &http.Server{
-		Addr:    fmt.Sprintf(":%d", httpPort),
-		Handler: mux,
+		Addr:     fmt.Sprintf(":%d", httpPort),
+		Handler:  mux,
+		ErrorLog: log.New(ioutil.Discard, "net/http", 0),
 	}
 	defer srv.Close()
 

+ 3 - 11
functional/rpcpb/etcd_config.go

@@ -50,15 +50,12 @@ var etcdFields = []string{
 
 	"SnapshotCount",
 	"QuotaBackendBytes",
-
-	// "PreVote",
-	// "InitialCorruptCheck",
 }
 
 // Flags returns etcd flags in string slice.
-func (cfg *Etcd) Flags() (fs []string) {
-	tp := reflect.TypeOf(*cfg)
-	vo := reflect.ValueOf(*cfg)
+func (e *Etcd) Flags() (fs []string) {
+	tp := reflect.TypeOf(*e)
+	vo := reflect.ValueOf(*e)
 	for _, name := range etcdFields {
 		field, ok := tp.FieldByName(name)
 		if !ok {
@@ -86,11 +83,6 @@ func (cfg *Etcd) Flags() (fs []string) {
 
 		fname := field.Tag.Get("yaml")
 
-		// not supported in old etcd
-		if fname == "pre-vote" || fname == "initial-corrupt-check" {
-			continue
-		}
-
 		if sv != "" {
 			fs = append(fs, fmt.Sprintf("--%s=%s", fname, sv))
 		}

+ 17 - 10
functional/rpcpb/etcd_config_test.go

@@ -19,11 +19,11 @@ import (
 	"testing"
 )
 
-func TestEtcdFlags(t *testing.T) {
-	cfg := &Etcd{
+func TestEtcd(t *testing.T) {
+	e := &Etcd{
 		Name:    "s1",
-		DataDir: "/tmp/etcd-agent-data-1/etcd.data",
-		WALDir:  "/tmp/etcd-agent-data-1/etcd.data/member/wal",
+		DataDir: "/tmp/etcd-functionl-1/etcd.data",
+		WALDir:  "/tmp/etcd-functionl-1/etcd.data/member/wal",
 
 		HeartbeatIntervalMs: 100,
 		ElectionTimeoutMs:   1000,
@@ -53,12 +53,16 @@ func TestEtcdFlags(t *testing.T) {
 
 		PreVote:             true,
 		InitialCorruptCheck: true,
+
+		Logger:     "zap",
+		LogOutputs: []string{"/tmp/etcd-functional-1/etcd.log"},
+		LogLevel:   "info",
 	}
 
-	exp := []string{
+	exps := []string{
 		"--name=s1",
-		"--data-dir=/tmp/etcd-agent-data-1/etcd.data",
-		"--wal-dir=/tmp/etcd-agent-data-1/etcd.data/member/wal",
+		"--data-dir=/tmp/etcd-functionl-1/etcd.data",
+		"--wal-dir=/tmp/etcd-functionl-1/etcd.data/member/wal",
 		"--heartbeat-interval=100",
 		"--election-timeout=1000",
 		"--listen-client-urls=https://127.0.0.1:1379",
@@ -76,9 +80,12 @@ func TestEtcdFlags(t *testing.T) {
 		"--quota-backend-bytes=10740000000",
 		"--pre-vote=true",
 		"--experimental-initial-corrupt-check=true",
+		"--logger=zap",
+		"--log-outputs=/tmp/etcd-functional-1/etcd.log",
+		"--log-level=info",
 	}
-	fs := cfg.Flags()
-	if !reflect.DeepEqual(exp, fs) {
-		t.Fatalf("expected %q, got %q", exp, fs)
+	fs := e.Flags()
+	if !reflect.DeepEqual(exps, fs) {
+		t.Fatalf("expected %q, got %q", exps, fs)
 	}
 }

+ 11 - 1
functional/rpcpb/member.go

@@ -23,9 +23,10 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/snapshot"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/logutil"
 	"github.com/coreos/etcd/pkg/transport"
-	"github.com/coreos/etcd/snapshot"
 
 	"github.com/dustin/go-humanize"
 	"go.uber.org/zap"
@@ -94,10 +95,19 @@ func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.
 		}
 	}
 
+	// TODO: make this configurable
+	level := "error"
+	if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+		level = "debug"
+	}
+	lcfg := logutil.DefaultZapLoggerConfig
+	lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(level))
+
 	cfg = &clientv3.Config{
 		Endpoints:   []string{m.EtcdClientEndpoint},
 		DialTimeout: 10 * time.Second,
 		DialOptions: opts,
+		LogConfig:   &lcfg,
 	}
 	if secure {
 		// assume save TLS assets are already stord on disk

File diff suppressed because it is too large
+ 636 - 168
functional/rpcpb/rpc.pb.go


+ 32 - 22
functional/rpcpb/rpc.proto

@@ -45,9 +45,8 @@ service Transport {
 }
 
 message Member {
-  // EtcdExecPath is the executable etcd binary path in agent server.
-  string EtcdExecPath = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec-path\""];
-  // TODO: support embedded etcd
+  // EtcdExec is the executable etcd binary path in agent server.
+  string EtcdExec = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec\""];
 
   // AgentAddr is the agent HTTP server address.
   string AgentAddr = 11 [(gogoproto.moretags) = "yaml:\"agent-addr\""];
@@ -56,8 +55,6 @@ message Member {
 
   // BaseDir is the base directory where all logs and etcd data are stored.
   string BaseDir = 101 [(gogoproto.moretags) = "yaml:\"base-dir\""];
-  // EtcdLogPath is the log file to store current etcd server logs.
-  string EtcdLogPath = 102 [(gogoproto.moretags) = "yaml:\"etcd-log-path\""];
 
   // EtcdClientProxy is true when client traffic needs to be proxied.
   // If true, listen client URL port must be different than advertise client URL port.
@@ -141,7 +138,7 @@ message Tester {
 
   // Stressers is the list of stresser types:
   // KV, LEASE, ELECTION_RUNNER, WATCH_RUNNER, LOCK_RACER_RUNNER, LEASE_RUNNER.
-  repeated string Stressers = 101 [(gogoproto.moretags) = "yaml:\"stressers\""];
+  repeated Stresser Stressers = 101 [(gogoproto.moretags) = "yaml:\"stressers\""];
   // Checkers is the list of consistency checker types:
   // KV_HASH, LEASE_EXPIRE, NO_CHECK, RUNNER.
   // Leave empty to skip consistency checks.
@@ -167,6 +164,35 @@ message Tester {
   int32 StressQPS = 302 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
 }
 
+enum StresserType {
+  KV_WRITE_SMALL = 0;
+  KV_WRITE_LARGE = 1;
+  KV_READ_ONE_KEY = 2;
+  KV_READ_RANGE = 3;
+  KV_DELETE_ONE_KEY = 4;
+  KV_DELETE_RANGE = 5;
+  KV_TXN_WRITE_DELETE = 6;
+
+  LEASE = 10;
+
+  ELECTION_RUNNER = 20;
+  WATCH_RUNNER = 31;
+  LOCK_RACER_RUNNER = 41;
+  LEASE_RUNNER = 51;
+}
+
+message Stresser {
+  string Type = 1 [(gogoproto.moretags) = "yaml:\"type\""];
+  double Weight = 2 [(gogoproto.moretags) = "yaml:\"weight\""];
+}
+
+enum Checker {
+  KV_HASH = 0;
+  LEASE_EXPIRE = 1;
+  RUNNER = 2;
+  NO_CHECK = 3;
+}
+
 message Etcd {
   string Name = 1 [(gogoproto.moretags) = "yaml:\"name\""];
   string DataDir = 2 [(gogoproto.moretags) = "yaml:\"data-dir\""];
@@ -594,19 +620,3 @@ enum Case {
   // EXTERNAL runs external failure injection scripts.
   EXTERNAL = 500;
 }
-
-enum Stresser {
-  KV = 0;
-  LEASE = 1;
-  ELECTION_RUNNER = 2;
-  WATCH_RUNNER = 3;
-  LOCK_RACER_RUNNER = 4;
-  LEASE_RUNNER = 5;
-}
-
-enum Checker {
-  KV_HASH = 0;
-  LEASE_EXPIRE = 1;
-  RUNNER = 2;
-  NO_CHECK = 3;
-}

+ 1 - 1
functional/runner/global.go

@@ -47,7 +47,7 @@ type roundClient struct {
 func newClient(eps []string, timeout time.Duration) *clientv3.Client {
 	c, err := clientv3.New(clientv3.Config{
 		Endpoints:   eps,
-		DialTimeout: time.Duration(timeout) * time.Second,
+		DialTimeout: timeout * time.Second,
 	})
 	if err != nil {
 		log.Fatal(err)

+ 2 - 2
functional/scripts/docker-local-agent.sh

@@ -13,7 +13,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-agent.sh" ]]; then
 fi
 
 if [[ -z "${GO_VERSION}" ]]; then
-  GO_VERSION=1.10.1
+  GO_VERSION=1.12.8
 fi
 echo "Running with GO_VERSION:" ${GO_VERSION}
 
@@ -38,5 +38,5 @@ docker run \
   --rm \
   --net=host \
   --name ${AGENT_NAME} \
-  gcr.io/etcd-development/etcd-functional-tester:go${GO_VERSION} \
+  gcr.io/etcd-development/etcd-functional:go${GO_VERSION} \
   /bin/bash -c "./bin/etcd-agent ${AGENT_ADDR_FLAG}"

+ 2 - 2
functional/scripts/docker-local-tester.sh

@@ -6,7 +6,7 @@ if ! [[ "${0}" =~ "scripts/docker-local-tester.sh" ]]; then
 fi
 
 if [[ -z "${GO_VERSION}" ]]; then
-  GO_VERSION=1.10.1
+  GO_VERSION=1.12.8
 fi
 echo "Running with GO_VERSION:" ${GO_VERSION}
 
@@ -14,5 +14,5 @@ docker run \
   --rm \
   --net=host \
   --name tester \
-  gcr.io/etcd-development/etcd-functional-tester:go${GO_VERSION} \
+  gcr.io/etcd-development/etcd-functional:go${GO_VERSION} \
   /bin/bash -c "./bin/etcd-tester --config ./functional.yaml"

+ 2 - 2
functional/scripts/genproto.sh

@@ -7,8 +7,8 @@ if ! [[ "$0" =~ "scripts/genproto.sh" ]]; then
 fi
 
 # for now, be conservative about what version of protoc we expect
-if ! [[ $(protoc --version) =~ "3.5.1" ]]; then
-  echo "could not find protoc 3.5.1, is it installed + in PATH?"
+if ! [[ $(protoc --version) =~ "3.7.1" ]]; then
+  echo "could not find protoc 3.7.1, is it installed + in PATH?"
   exit 255
 fi
 

+ 17 - 6
functional/tester/case.go

@@ -275,6 +275,18 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
 
 	for i := 0; i < retries; i++ {
 		lastRev, err = clus.maxRev()
+		if lastRev == 0 {
+			clus.lg.Info(
+				"trigger snapshot RETRY",
+				zap.Int("retries", i),
+				zap.Int64("etcd-snapshot-count", snapshotCount),
+				zap.Int64("start-revision", startRev),
+				zap.Error(err),
+			)
+			time.Sleep(3 * time.Second)
+			continue
+		}
+
 		// If the number of proposals committed is bigger than snapshot count,
 		// a new snapshot should have been created.
 		diff := lastRev - startRev
@@ -292,12 +304,8 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
 			return nil
 		}
 
-		dur := time.Second
-		if diff < 0 || err != nil {
-			dur = 3 * time.Second
-		}
 		clus.lg.Info(
-			"trigger snapshot PROGRESS",
+			"trigger snapshot RETRY",
 			zap.Int("retries", i),
 			zap.Int64("committed-entries", diff),
 			zap.Int64("etcd-snapshot-count", snapshotCount),
@@ -306,7 +314,10 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
 			zap.Duration("took", time.Since(now)),
 			zap.Error(err),
 		)
-		time.Sleep(dur)
+		time.Sleep(time.Second)
+		if err != nil {
+			time.Sleep(2 * time.Second)
+		}
 	}
 
 	return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries)

+ 1 - 1
functional/tester/case_network_delay.go

@@ -26,7 +26,7 @@ const (
 	// Wait more when it recovers from slow network, because network layer
 	// needs extra time to propagate traffic control (tc command) change.
 	// Otherwise, we get different hash values from the previous revision.
-	// For more detail, please see https://github.com/coreos/etcd/issues/5121.
+	// For more detail, please see https://github.com/etcd-io/etcd/issues/5121.
 	waitRecover = 5 * time.Second
 )
 

+ 8 - 6
functional/tester/cluster.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"log"
 	"math/rand"
 	"net/http"
 	"net/url"
@@ -106,8 +107,9 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
 		}
 	}
 	clus.testerHTTPServer = &http.Server{
-		Addr:    clus.Tester.Addr,
-		Handler: mux,
+		Addr:     clus.Tester.Addr,
+		Handler:  mux,
+		ErrorLog: log.New(ioutil.Discard, "net/http", 0),
 	}
 	go clus.serveTesterServer()
 
@@ -491,9 +493,9 @@ func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Respons
 
 	m, secure := clus.Members[idx], false
 	for _, cu := range m.Etcd.AdvertiseClientURLs {
-		u, err := url.Parse(cu)
-		if err != nil {
-			return nil, err
+		u, perr := url.Parse(cu)
+		if perr != nil {
+			return nil, perr
 		}
 		if u.Scheme == "https" { // TODO: handle unix
 			secure = true
@@ -591,7 +593,7 @@ func (clus *Cluster) WaitHealth() error {
 	// wait 60s to check cluster health.
 	// TODO: set it to a reasonable value. It is set that high because
 	// follower may use long time to catch up the leader when reboot under
-	// reasonable workload (https://github.com/coreos/etcd/issues/2698)
+	// reasonable workload (https://github.com/etcd-io/etcd/issues/2698)
 	for i := 0; i < 60; i++ {
 		for _, m := range clus.Members {
 			if err = m.WriteHealthKey(); err != nil {

+ 60 - 42
functional/tester/cluster_read_config.go

@@ -44,14 +44,56 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 		return nil, fmt.Errorf("len(clus.Members) expects at least 3, got %d", len(clus.Members))
 	}
 
+	failpointsEnabled := false
+	for _, c := range clus.Tester.Cases {
+		if c == rpcpb.Case_FAILPOINTS.String() {
+			failpointsEnabled = true
+			break
+		}
+	}
+
+	if len(clus.Tester.Cases) == 0 {
+		return nil, errors.New("cases not found")
+	}
+	if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
+		return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
+	}
+	if clus.Tester.UpdatedDelayLatencyMs == 0 {
+		clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
+	}
+
+	for _, v := range clus.Tester.Cases {
+		if _, ok := rpcpb.Case_value[v]; !ok {
+			return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
+		}
+	}
+
+	for _, s := range clus.Tester.Stressers {
+		if _, ok := rpcpb.StresserType_value[s.Type]; !ok {
+			return nil, fmt.Errorf("unknown 'StresserType' %+v", s)
+		}
+	}
+
+	for _, v := range clus.Tester.Checkers {
+		if _, ok := rpcpb.Checker_value[v]; !ok {
+			return nil, fmt.Errorf("Checker is unknown; got %q", v)
+		}
+	}
+
+	if clus.Tester.StressKeySuffixRangeTxn > 100 {
+		return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
+	}
+	if clus.Tester.StressKeyTxnOps > 64 {
+		return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
+	}
+
 	for i, mem := range clus.Members {
+		if mem.EtcdExec == "embed" && failpointsEnabled {
+			return nil, errors.New("EtcdExec 'embed' cannot be run with failpoints enabled")
+		}
 		if mem.BaseDir == "" {
 			return nil, fmt.Errorf("BaseDir cannot be empty (got %q)", mem.BaseDir)
 		}
-		if mem.EtcdLogPath == "" {
-			return nil, fmt.Errorf("EtcdLogPath cannot be empty (got %q)", mem.EtcdLogPath)
-		}
-
 		if mem.Etcd.Name == "" {
 			return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", mem)
 		}
@@ -132,9 +174,6 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 			}
 		}
 
-		if !strings.HasPrefix(mem.EtcdLogPath, mem.BaseDir) {
-			return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", mem.EtcdLogPath)
-		}
 		if !strings.HasPrefix(mem.Etcd.DataDir, mem.BaseDir) {
 			return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", mem.Etcd.DataDir)
 		}
@@ -188,7 +227,7 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 			return nil, fmt.Errorf("Etcd.PeerClientCertAuth and Etcd.PeerAutoTLS cannot be both 'true'")
 		}
 		if (mem.Etcd.PeerCertFile == "") != (mem.Etcd.PeerKeyFile == "") {
-			return nil, fmt.Errorf("Both Etcd.PeerCertFile %q and Etcd.PeerKeyFile %q must be either empty or non-empty", mem.Etcd.PeerCertFile, mem.Etcd.PeerKeyFile)
+			return nil, fmt.Errorf("both Etcd.PeerCertFile %q and Etcd.PeerKeyFile %q must be either empty or non-empty", mem.Etcd.PeerCertFile, mem.Etcd.PeerKeyFile)
 		}
 		if mem.Etcd.ClientCertAuth && mem.Etcd.ClientAutoTLS {
 			return nil, fmt.Errorf("Etcd.ClientCertAuth and Etcd.ClientAutoTLS cannot be both 'true'")
@@ -212,7 +251,7 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 			return nil, fmt.Errorf("Etcd.ClientCertAuth 'false', but Etcd.ClientTrustedCAFile is %q", mem.Etcd.PeerCertFile)
 		}
 		if (mem.Etcd.ClientCertFile == "") != (mem.Etcd.ClientKeyFile == "") {
-			return nil, fmt.Errorf("Both Etcd.ClientCertFile %q and Etcd.ClientKeyFile %q must be either empty or non-empty", mem.Etcd.ClientCertFile, mem.Etcd.ClientKeyFile)
+			return nil, fmt.Errorf("both Etcd.ClientCertFile %q and Etcd.ClientKeyFile %q must be either empty or non-empty", mem.Etcd.ClientCertFile, mem.Etcd.ClientKeyFile)
 		}
 
 		peerTLS := mem.Etcd.PeerAutoTLS ||
@@ -317,42 +356,21 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
 				}
 				clus.Members[i].ClientCertData = string(data)
 			}
-		}
-	}
-
-	if len(clus.Tester.Cases) == 0 {
-		return nil, errors.New("Cases not found")
-	}
-	if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
-		return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
-	}
-	if clus.Tester.UpdatedDelayLatencyMs == 0 {
-		clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
-	}
 
-	for _, v := range clus.Tester.Cases {
-		if _, ok := rpcpb.Case_value[v]; !ok {
-			return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v)
-		}
-	}
-
-	for _, v := range clus.Tester.Stressers {
-		if _, ok := rpcpb.Stresser_value[v]; !ok {
-			return nil, fmt.Errorf("Stresser is unknown; got %q", v)
-		}
-	}
-	for _, v := range clus.Tester.Checkers {
-		if _, ok := rpcpb.Checker_value[v]; !ok {
-			return nil, fmt.Errorf("Checker is unknown; got %q", v)
+			if len(mem.Etcd.LogOutputs) == 0 {
+				return nil, fmt.Errorf("mem.Etcd.LogOutputs cannot be empty")
+			}
+			for _, v := range mem.Etcd.LogOutputs {
+				switch v {
+				case "stderr", "stdout", "/dev/null", "default":
+				default:
+					if !strings.HasPrefix(v, mem.BaseDir) {
+						return nil, fmt.Errorf("LogOutput %q must be prefixed with BaseDir %q", v, mem.BaseDir)
+					}
+				}
+			}
 		}
 	}
 
-	if clus.Tester.StressKeySuffixRangeTxn > 100 {
-		return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
-	}
-	if clus.Tester.StressKeyTxnOps > 64 {
-		return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
-	}
-
 	return clus, err
 }

+ 2 - 2
functional/tester/cluster_run.go

@@ -212,8 +212,8 @@ func (clus *Cluster) doRound() error {
 				)
 
 				// with network delay, some ongoing requests may fail
-				// only return error, if more than 10% of QPS requests fail
-				if cnt > int(clus.Tester.StressQPS)/10 {
+				// only return error, if more than 30% of QPS requests fail
+				if cnt > int(float64(clus.Tester.StressQPS)*0.3) {
 					return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
 				}
 			}

+ 304 - 0
functional/tester/cluster_test.go

@@ -0,0 +1,304 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tester
+
+import (
+	"reflect"
+	"sort"
+	"testing"
+
+	"github.com/coreos/etcd/functional/rpcpb"
+
+	"go.uber.org/zap"
+)
+
+func Test_read(t *testing.T) {
+	exp := &Cluster{
+		Members: []*rpcpb.Member{
+			{
+				EtcdExec:           "./bin/etcd",
+				AgentAddr:          "127.0.0.1:19027",
+				FailpointHTTPAddr:  "http://127.0.0.1:7381",
+				BaseDir:            "/tmp/etcd-functional-1",
+				EtcdClientProxy:    false,
+				EtcdPeerProxy:      true,
+				EtcdClientEndpoint: "127.0.0.1:1379",
+				Etcd: &rpcpb.Etcd{
+					Name:                "s1",
+					DataDir:             "/tmp/etcd-functional-1/etcd.data",
+					WALDir:              "/tmp/etcd-functional-1/etcd.data/member/wal",
+					HeartbeatIntervalMs: 100,
+					ElectionTimeoutMs:   1000,
+					ListenClientURLs:    []string{"https://127.0.0.1:1379"},
+					AdvertiseClientURLs: []string{"https://127.0.0.1:1379"},
+					ClientAutoTLS:       true,
+					ClientCertAuth:      false,
+					ClientCertFile:      "",
+					ClientKeyFile:       "",
+					ClientTrustedCAFile: "",
+					ListenPeerURLs:      []string{"https://127.0.0.1:1380"},
+					AdvertisePeerURLs:   []string{"https://127.0.0.1:1381"},
+					PeerAutoTLS:         true,
+					PeerClientCertAuth:  false,
+					PeerCertFile:        "",
+					PeerKeyFile:         "",
+					PeerTrustedCAFile:   "",
+					InitialCluster:      "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
+					InitialClusterState: "new",
+					InitialClusterToken: "tkn",
+					SnapshotCount:       10000,
+					QuotaBackendBytes:   10740000000,
+					PreVote:             true,
+					InitialCorruptCheck: true,
+					Logger:              "zap",
+					LogOutputs:          []string{"/tmp/etcd-functional-1/etcd.log"},
+					Debug:               true,
+				},
+				ClientCertData:      "",
+				ClientCertPath:      "",
+				ClientKeyData:       "",
+				ClientKeyPath:       "",
+				ClientTrustedCAData: "",
+				ClientTrustedCAPath: "",
+				PeerCertData:        "",
+				PeerCertPath:        "",
+				PeerKeyData:         "",
+				PeerKeyPath:         "",
+				PeerTrustedCAData:   "",
+				PeerTrustedCAPath:   "",
+				SnapshotPath:        "/tmp/etcd-functional-1.snapshot.db",
+			},
+			{
+				EtcdExec:           "./bin/etcd",
+				AgentAddr:          "127.0.0.1:29027",
+				FailpointHTTPAddr:  "http://127.0.0.1:7382",
+				BaseDir:            "/tmp/etcd-functional-2",
+				EtcdClientProxy:    false,
+				EtcdPeerProxy:      true,
+				EtcdClientEndpoint: "127.0.0.1:2379",
+				Etcd: &rpcpb.Etcd{
+					Name:                "s2",
+					DataDir:             "/tmp/etcd-functional-2/etcd.data",
+					WALDir:              "/tmp/etcd-functional-2/etcd.data/member/wal",
+					HeartbeatIntervalMs: 100,
+					ElectionTimeoutMs:   1000,
+					ListenClientURLs:    []string{"https://127.0.0.1:2379"},
+					AdvertiseClientURLs: []string{"https://127.0.0.1:2379"},
+					ClientAutoTLS:       true,
+					ClientCertAuth:      false,
+					ClientCertFile:      "",
+					ClientKeyFile:       "",
+					ClientTrustedCAFile: "",
+					ListenPeerURLs:      []string{"https://127.0.0.1:2380"},
+					AdvertisePeerURLs:   []string{"https://127.0.0.1:2381"},
+					PeerAutoTLS:         true,
+					PeerClientCertAuth:  false,
+					PeerCertFile:        "",
+					PeerKeyFile:         "",
+					PeerTrustedCAFile:   "",
+					InitialCluster:      "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
+					InitialClusterState: "new",
+					InitialClusterToken: "tkn",
+					SnapshotCount:       10000,
+					QuotaBackendBytes:   10740000000,
+					PreVote:             true,
+					InitialCorruptCheck: true,
+					Logger:              "zap",
+					LogOutputs:          []string{"/tmp/etcd-functional-2/etcd.log"},
+					Debug:               true,
+				},
+				ClientCertData:      "",
+				ClientCertPath:      "",
+				ClientKeyData:       "",
+				ClientKeyPath:       "",
+				ClientTrustedCAData: "",
+				ClientTrustedCAPath: "",
+				PeerCertData:        "",
+				PeerCertPath:        "",
+				PeerKeyData:         "",
+				PeerKeyPath:         "",
+				PeerTrustedCAData:   "",
+				PeerTrustedCAPath:   "",
+				SnapshotPath:        "/tmp/etcd-functional-2.snapshot.db",
+			},
+			{
+				EtcdExec:           "./bin/etcd",
+				AgentAddr:          "127.0.0.1:39027",
+				FailpointHTTPAddr:  "http://127.0.0.1:7383",
+				BaseDir:            "/tmp/etcd-functional-3",
+				EtcdClientProxy:    false,
+				EtcdPeerProxy:      true,
+				EtcdClientEndpoint: "127.0.0.1:3379",
+				Etcd: &rpcpb.Etcd{
+					Name:                "s3",
+					DataDir:             "/tmp/etcd-functional-3/etcd.data",
+					WALDir:              "/tmp/etcd-functional-3/etcd.data/member/wal",
+					HeartbeatIntervalMs: 100,
+					ElectionTimeoutMs:   1000,
+					ListenClientURLs:    []string{"https://127.0.0.1:3379"},
+					AdvertiseClientURLs: []string{"https://127.0.0.1:3379"},
+					ClientAutoTLS:       true,
+					ClientCertAuth:      false,
+					ClientCertFile:      "",
+					ClientKeyFile:       "",
+					ClientTrustedCAFile: "",
+					ListenPeerURLs:      []string{"https://127.0.0.1:3380"},
+					AdvertisePeerURLs:   []string{"https://127.0.0.1:3381"},
+					PeerAutoTLS:         true,
+					PeerClientCertAuth:  false,
+					PeerCertFile:        "",
+					PeerKeyFile:         "",
+					PeerTrustedCAFile:   "",
+					InitialCluster:      "s1=https://127.0.0.1:1381,s2=https://127.0.0.1:2381,s3=https://127.0.0.1:3381",
+					InitialClusterState: "new",
+					InitialClusterToken: "tkn",
+					SnapshotCount:       10000,
+					QuotaBackendBytes:   10740000000,
+					PreVote:             true,
+					InitialCorruptCheck: true,
+					Logger:              "zap",
+					LogOutputs:          []string{"/tmp/etcd-functional-3/etcd.log"},
+					Debug:               true,
+				},
+				ClientCertData:      "",
+				ClientCertPath:      "",
+				ClientKeyData:       "",
+				ClientKeyPath:       "",
+				ClientTrustedCAData: "",
+				ClientTrustedCAPath: "",
+				PeerCertData:        "",
+				PeerCertPath:        "",
+				PeerKeyData:         "",
+				PeerKeyPath:         "",
+				PeerTrustedCAData:   "",
+				PeerTrustedCAPath:   "",
+				SnapshotPath:        "/tmp/etcd-functional-3.snapshot.db",
+			},
+		},
+		Tester: &rpcpb.Tester{
+			DataDir:               "/tmp/etcd-tester-data",
+			Network:               "tcp",
+			Addr:                  "127.0.0.1:9028",
+			DelayLatencyMs:        5000,
+			DelayLatencyMsRv:      500,
+			UpdatedDelayLatencyMs: 5000,
+			RoundLimit:            1,
+			ExitOnCaseFail:        true,
+			EnablePprof:           true,
+			CaseDelayMs:           7000,
+			CaseShuffle:           true,
+			Cases: []string{
+				"SIGTERM_ONE_FOLLOWER",
+				"SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
+				"SIGTERM_LEADER",
+				"SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"SIGTERM_QUORUM",
+				"SIGTERM_ALL",
+				"SIGQUIT_AND_REMOVE_ONE_FOLLOWER",
+				"SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
+				// "SIGQUIT_AND_REMOVE_LEADER",
+				// "SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				// "SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH",
+				// "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER",
+				// "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
+				"BLACKHOLE_PEER_PORT_TX_RX_LEADER",
+				"BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"BLACKHOLE_PEER_PORT_TX_RX_QUORUM",
+				"BLACKHOLE_PEER_PORT_TX_RX_ALL",
+				// "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
+				// "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
+				// "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
+				// "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT",
+				"DELAY_PEER_PORT_TX_RX_LEADER",
+				"RANDOM_DELAY_PEER_PORT_TX_RX_LEADER",
+				"DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT",
+				"DELAY_PEER_PORT_TX_RX_QUORUM",
+				"RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM",
+				"DELAY_PEER_PORT_TX_RX_ALL",
+				"RANDOM_DELAY_PEER_PORT_TX_RX_ALL",
+				"NO_FAIL_WITH_STRESS",
+				"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
+			},
+			FailpointCommands: []string{`panic("etcd-tester")`},
+			RunnerExecPath:    "./bin/etcd-runner",
+			ExternalExecPath:  "",
+			Stressers: []*rpcpb.Stresser{
+				{Type: "KV_WRITE_SMALL", Weight: 0.35},
+				{Type: "KV_WRITE_LARGE", Weight: 0.002},
+				{Type: "KV_READ_ONE_KEY", Weight: 0.07},
+				{Type: "KV_READ_RANGE", Weight: 0.07},
+				{Type: "KV_DELETE_ONE_KEY", Weight: 0.07},
+				{Type: "KV_DELETE_RANGE", Weight: 0.07},
+				{Type: "KV_TXN_WRITE_DELETE", Weight: 0.35},
+				{Type: "LEASE", Weight: 0.0},
+			},
+			Checkers:                []string{"KV_HASH", "LEASE_EXPIRE"},
+			StressKeySize:           100,
+			StressKeySizeLarge:      32769,
+			StressKeySuffixRange:    250000,
+			StressKeySuffixRangeTxn: 100,
+			StressKeyTxnOps:         10,
+			StressClients:           100,
+			StressQPS:               2000,
+		},
+	}
+
+	logger, err := zap.NewProduction()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer logger.Sync()
+
+	cfg, err := read(logger, "../../functional.yaml")
+	if err != nil {
+		t.Fatal(err)
+	}
+	cfg.lg = nil
+
+	if !reflect.DeepEqual(exp, cfg) {
+		t.Fatalf("expected %+v, got %+v", exp, cfg)
+	}
+
+	cfg.lg = logger
+
+	cfg.updateCases()
+	fs1 := cfg.listCases()
+
+	cfg.shuffleCases()
+	fs2 := cfg.listCases()
+	if reflect.DeepEqual(fs1, fs2) {
+		t.Fatalf("expected shuffled failure cases, got %q", fs2)
+	}
+
+	cfg.shuffleCases()
+	fs3 := cfg.listCases()
+	if reflect.DeepEqual(fs2, fs3) {
+		t.Fatalf("expected reshuffled failure cases from %q, got %q", fs2, fs3)
+	}
+
+	// shuffle ensures visit all exactly once
+	// so when sorted, failure cases must be equal
+	sort.Strings(fs1)
+	sort.Strings(fs2)
+	sort.Strings(fs3)
+
+	if !reflect.DeepEqual(fs1, fs2) {
+		t.Fatalf("expected %q, got %q", fs1, fs2)
+	}
+	if !reflect.DeepEqual(fs2, fs3) {
+		t.Fatalf("expected %q, got %q", fs2, fs3)
+	}
+}

+ 59 - 35
functional/tester/stresser.go

@@ -37,40 +37,60 @@ type Stresser interface {
 
 // newStresser creates stresser from a comma separated list of stresser types.
 func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
-	stressers = make([]Stresser, len(clus.Tester.Stressers))
-	for i, stype := range clus.Tester.Stressers {
+	// TODO: Too intensive stressing clients can panic etcd member with
+	// 'out of memory' error. Put rate limits in server side.
+	ks := &keyStresser{
+		lg:                clus.lg,
+		m:                 m,
+		keySize:           int(clus.Tester.StressKeySize),
+		keyLargeSize:      int(clus.Tester.StressKeySizeLarge),
+		keySuffixRange:    int(clus.Tester.StressKeySuffixRange),
+		keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
+		keyTxnOps:         int(clus.Tester.StressKeyTxnOps),
+		clientsN:          int(clus.Tester.StressClients),
+		rateLimiter:       clus.rateLimiter,
+	}
+	ksExist := false
+
+	for _, s := range clus.Tester.Stressers {
 		clus.lg.Info(
 			"creating stresser",
-			zap.String("type", stype),
+			zap.String("type", s.Type),
+			zap.Float64("weight", s.Weight),
 			zap.String("endpoint", m.EtcdClientEndpoint),
 		)
-
-		switch stype {
-		case "KV":
-			// TODO: Too intensive stressing clients can panic etcd member with
-			// 'out of memory' error. Put rate limits in server side.
-			stressers[i] = &keyStresser{
-				stype:             rpcpb.Stresser_KV,
-				lg:                clus.lg,
-				m:                 m,
-				keySize:           int(clus.Tester.StressKeySize),
-				keyLargeSize:      int(clus.Tester.StressKeySizeLarge),
-				keySuffixRange:    int(clus.Tester.StressKeySuffixRange),
-				keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
-				keyTxnOps:         int(clus.Tester.StressKeyTxnOps),
-				clientsN:          int(clus.Tester.StressClients),
-				rateLimiter:       clus.rateLimiter,
-			}
+		switch s.Type {
+		case "KV_WRITE_SMALL":
+			ksExist = true
+			ks.weightKVWriteSmall = s.Weight
+		case "KV_WRITE_LARGE":
+			ksExist = true
+			ks.weightKVWriteLarge = s.Weight
+		case "KV_READ_ONE_KEY":
+			ksExist = true
+			ks.weightKVReadOneKey = s.Weight
+		case "KV_READ_RANGE":
+			ksExist = true
+			ks.weightKVReadRange = s.Weight
+		case "KV_DELETE_ONE_KEY":
+			ksExist = true
+			ks.weightKVDeleteOneKey = s.Weight
+		case "KV_DELETE_RANGE":
+			ksExist = true
+			ks.weightKVDeleteRange = s.Weight
+		case "KV_TXN_WRITE_DELETE":
+			ksExist = true
+			ks.weightKVTxnWriteDelete = s.Weight
 
 		case "LEASE":
-			stressers[i] = &leaseStresser{
-				stype:        rpcpb.Stresser_LEASE,
+			stressers = append(stressers, &leaseStresser{
+				stype:        rpcpb.StresserType_LEASE,
 				lg:           clus.lg,
 				m:            m,
 				numLeases:    10, // TODO: configurable
 				keysPerLease: 10, // TODO: configurable
 				rateLimiter:  clus.rateLimiter,
-			}
+			})
 
 		case "ELECTION_RUNNER":
 			reqRate := 100
@@ -83,15 +103,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_ELECTION_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_ELECTION_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "WATCH_RUNNER":
 			reqRate := 100
@@ -105,15 +125,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_WATCH_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_WATCH_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "LOCK_RACER_RUNNER":
 			reqRate := 100
@@ -125,15 +145,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--rounds=0", // runs forever
 				"--req-rate", fmt.Sprintf("%v", reqRate),
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_LOCK_RACER_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_LOCK_RACER_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				reqRate,
-			)
+			))
 
 		case "LEASE_RUNNER":
 			args := []string{
@@ -141,16 +161,20 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
 				"--ttl=30",
 				"--endpoints", m.EtcdClientEndpoint,
 			}
-			stressers[i] = newRunnerStresser(
-				rpcpb.Stresser_LEASE_RUNNER,
+			stressers = append(stressers, newRunnerStresser(
+				rpcpb.StresserType_LEASE_RUNNER,
 				m.EtcdClientEndpoint,
 				clus.lg,
 				clus.Tester.RunnerExecPath,
 				args,
 				clus.rateLimiter,
 				0,
-			)
+			))
 		}
 	}
+
+	if ksExist {
+		return append(stressers, ks)
+	}
 	return stressers
 }

+ 80 - 64
functional/tester/stresser_key.go

@@ -31,15 +31,23 @@ import (
 	"go.uber.org/zap"
 	"golang.org/x/time/rate"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/transport"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 type keyStresser struct {
-	stype rpcpb.Stresser
-	lg    *zap.Logger
+	lg *zap.Logger
 
 	m *rpcpb.Member
 
+	weightKVWriteSmall     float64
+	weightKVWriteLarge     float64
+	weightKVReadOneKey     float64
+	weightKVReadRange      float64
+	weightKVDeleteOneKey   float64
+	weightKVDeleteRange    float64
+	weightKVTxnWriteDelete float64
+
 	keySize           int
 	keyLargeSize      int
 	keySuffixRange    int
@@ -74,26 +82,16 @@ func (s *keyStresser) Stress() error {
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 
 	s.wg.Add(s.clientsN)
-	var stressEntries = []stressEntry{
-		{weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
-		{
-			weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
-			f:      newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize),
-		},
-		{weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)},
-		{weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
-	}
-	if s.keyTxnSuffixRange > 0 {
-		// adjust to make up ±70% of workloads with writes
-		stressEntries[0].weight = 0.35
-		stressEntries = append(stressEntries, stressEntry{
-			weight: 0.35,
-			f:      newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps),
-		})
-	}
-	s.stressTable = createStressTable(stressEntries)
+
+	s.stressTable = createStressTable([]stressEntry{
+		{weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
+		{weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)},
+		{weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
+		{weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)},
+	})
 
 	s.emu.Lock()
 	s.paused = false
@@ -105,7 +103,7 @@ func (s *keyStresser) Stress() error {
 
 	s.lg.Info(
 		"stress START",
-		zap.String("stress-type", s.stype.String()),
+		zap.String("stress-type", "KV"),
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	return nil
@@ -130,41 +128,7 @@ func (s *keyStresser) run() {
 			continue
 		}
 
-		switch rpctypes.ErrorDesc(err) {
-		case context.DeadlineExceeded.Error():
-			// This retries when request is triggered at the same time as
-			// leader failure. When we terminate the leader, the request to
-			// that leader cannot be processed, and times out. Also requests
-			// to followers cannot be forwarded to the old leader, so timing out
-			// as well. We want to keep stressing until the cluster elects a
-			// new leader and start processing requests again.
-		case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
-			// This retries when request is triggered at the same time as
-			// leader failure and follower nodes receive time out errors
-			// from losing their leader. Followers should retry to connect
-			// to the new leader.
-		case etcdserver.ErrStopped.Error():
-			// one of the etcd nodes stopped from failure injection
-		case transport.ErrConnClosing.Desc:
-			// server closed the transport (failure injected node)
-		case rpctypes.ErrNotCapable.Error():
-			// capability check has not been done (in the beginning)
-		case rpctypes.ErrTooManyRequests.Error():
-			// hitting the recovering member.
-		case context.Canceled.Error():
-			// from stresser.Cancel method:
-			return
-		case grpc.ErrClientConnClosing.Error():
-			// from stresser.Cancel method:
-			return
-		default:
-			s.lg.Warn(
-				"stress run exiting",
-				zap.String("stress-type", s.stype.String()),
-				zap.String("endpoint", s.m.EtcdClientEndpoint),
-				zap.String("error-type", reflect.TypeOf(err).String()),
-				zap.Error(err),
-			)
+		if !s.isRetryableError(err) {
 			return
 		}
 
@@ -177,6 +141,58 @@ func (s *keyStresser) run() {
 	}
 }
 
+func (s *keyStresser) isRetryableError(err error) bool {
+	switch rpctypes.ErrorDesc(err) {
+	// retryable
+	case context.DeadlineExceeded.Error():
+		// This retries when request is triggered at the same time as
+		// leader failure. When we terminate the leader, the request to
+		// that leader cannot be processed, and times out. Also requests
+		// to followers cannot be forwarded to the old leader, so timing out
+		// as well. We want to keep stressing until the cluster elects a
+		// new leader and start processing requests again.
+		return true
+	case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
+		// This retries when request is triggered at the same time as
+		// leader failure and follower nodes receive time out errors
+		// from losing their leader. Followers should retry to connect
+		// to the new leader.
+		return true
+	case etcdserver.ErrStopped.Error():
+		// one of the etcd nodes stopped from failure injection
+		return true
+	case rpctypes.ErrNotCapable.Error():
+		// capability check has not been done (in the beginning)
+		return true
+	case rpctypes.ErrTooManyRequests.Error():
+		// hitting the recovering member.
+		return true
+	// case raft.ErrProposalDropped.Error():
+	// 	// removed member, or leadership has changed (old leader got raftpb.MsgProp)
+	// 	return true
+
+	// not retryable.
+	case context.Canceled.Error():
+		// from stresser.Cancel method:
+		return false
+	}
+
+	if status.Convert(err).Code() == codes.Unavailable {
+		// gRPC connection errors are translated to status.Unavailable
+		return true
+	}
+
+	s.lg.Warn(
+		"stress run exiting",
+		zap.String("stress-type", "KV"),
+		zap.String("endpoint", s.m.EtcdClientEndpoint),
+		zap.String("error-type", reflect.TypeOf(err).String()),
+		zap.String("error-desc", rpctypes.ErrorDesc(err)),
+		zap.Error(err),
+	)
+	return false
+}
+
 func (s *keyStresser) Pause() map[string]int {
 	return s.Close()
 }
@@ -194,7 +210,7 @@ func (s *keyStresser) Close() map[string]int {
 
 	s.lg.Info(
 		"stress STOP",
-		zap.String("stress-type", s.stype.String()),
+		zap.String("stress-type", "KV"),
 		zap.String("endpoint", s.m.EtcdClientEndpoint),
 	)
 	return ess
@@ -207,13 +223,13 @@ func (s *keyStresser) ModifiedKeys() int64 {
 type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
 
 type stressEntry struct {
-	weight float32
+	weight float64
 	f      stressFunc
 }
 
 type stressTable struct {
 	entries    []stressEntry
-	sumWeights float32
+	sumWeights float64
 }
 
 func createStressTable(entries []stressEntry) *stressTable {
@@ -225,8 +241,8 @@ func createStressTable(entries []stressEntry) *stressTable {
 }
 
 func (st *stressTable) choose() stressFunc {
-	v := rand.Float32() * st.sumWeights
-	var sum float32
+	v := rand.Float64() * st.sumWeights
+	var sum float64
 	var idx int
 	for i := range st.entries {
 		sum += st.entries[i].weight

+ 1 - 1
functional/tester/stresser_lease.go

@@ -38,7 +38,7 @@ const (
 )
 
 type leaseStresser struct {
-	stype rpcpb.Stresser
+	stype rpcpb.StresserType
 	lg    *zap.Logger
 
 	m      *rpcpb.Member

+ 3 - 2
functional/tester/stresser_runner.go

@@ -27,7 +27,7 @@ import (
 )
 
 type runnerStresser struct {
-	stype              rpcpb.Stresser
+	stype              rpcpb.StresserType
 	etcdClientEndpoint string
 	lg                 *zap.Logger
 
@@ -42,7 +42,7 @@ type runnerStresser struct {
 }
 
 func newRunnerStresser(
-	stype rpcpb.Stresser,
+	stype rpcpb.StresserType,
 	ep string,
 	lg *zap.Logger,
 	cmdStr string,
@@ -54,6 +54,7 @@ func newRunnerStresser(
 	return &runnerStresser{
 		stype:              stype,
 		etcdClientEndpoint: ep,
+		lg:                 lg,
 		cmdStr:             cmdStr,
 		args:               args,
 		rl:                 rl,

+ 2 - 0
go.mod

@@ -13,6 +13,7 @@ require (
 	github.com/dgrijalva/jwt-go v3.0.0+incompatible
 	github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
 	github.com/fatih/color v1.7.0 // indirect
+	github.com/ghodss/yaml v1.0.0 // indirect
 	github.com/gogo/protobuf v1.2.1
 	github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
 	github.com/golang/protobuf v1.3.2
@@ -46,6 +47,7 @@ require (
 	github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8
 	github.com/urfave/cli v1.18.0
 	github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18
+	go.etcd.io/etcd v3.3.13+incompatible
 	go.uber.org/atomic v1.3.1 // indirect
 	go.uber.org/multierr v1.1.0 // indirect
 	go.uber.org/zap v1.10.0

+ 4 - 0
go.sum

@@ -26,6 +26,8 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@@ -114,6 +116,8 @@ github.com/urfave/cli v1.18.0 h1:m9MfmZWX7bwr9kUcs/Asr95j0IVXzGNNc+/5ku2m26Q=
 github.com/urfave/cli v1.18.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
 github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs=
 github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=
+go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
 go.uber.org/atomic v1.3.1 h1:U8WaWEmp56LGz7PReduqHRVF6zzs9GbMC2NEZ42dxSQ=
 go.uber.org/atomic v1.3.1/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=

Some files were not shown because too many files changed in this diff