|
|
@@ -18,23 +18,24 @@ import (
|
|
|
"context"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "io"
|
|
|
"io/ioutil"
|
|
|
"math/rand"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/coreos/etcd/functional/rpcpb"
|
|
|
"github.com/coreos/etcd/pkg/debugutil"
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
|
- "github.com/coreos/etcd/tools/functional-tester/rpcpb"
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
|
"go.uber.org/zap"
|
|
|
"golang.org/x/time/rate"
|
|
|
"google.golang.org/grpc"
|
|
|
- yaml "gopkg.in/yaml.v2"
|
|
|
)
|
|
|
|
|
|
// Cluster defines tester cluster.
|
|
|
@@ -62,221 +63,6 @@ type Cluster struct {
|
|
|
cs int
|
|
|
}
|
|
|
|
|
|
-func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
|
|
- bts, err := ioutil.ReadFile(fpath)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- lg.Info("opened configuration file", zap.String("path", fpath))
|
|
|
-
|
|
|
- clus := &Cluster{lg: lg}
|
|
|
- if err = yaml.Unmarshal(bts, clus); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- for i, mem := range clus.Members {
|
|
|
- if mem.BaseDir == "" {
|
|
|
- return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", mem.BaseDir)
|
|
|
- }
|
|
|
- if mem.EtcdLogPath == "" {
|
|
|
- return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", mem.EtcdLogPath)
|
|
|
- }
|
|
|
-
|
|
|
- if mem.Etcd.Name == "" {
|
|
|
- return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", mem)
|
|
|
- }
|
|
|
- if mem.Etcd.DataDir == "" {
|
|
|
- return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", mem)
|
|
|
- }
|
|
|
- if mem.Etcd.SnapshotCount == 0 {
|
|
|
- return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", mem.Etcd.SnapshotCount)
|
|
|
- }
|
|
|
- if mem.Etcd.DataDir == "" {
|
|
|
- return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", mem.Etcd.DataDir)
|
|
|
- }
|
|
|
- if mem.Etcd.WALDir == "" {
|
|
|
- clus.Members[i].Etcd.WALDir = filepath.Join(mem.Etcd.DataDir, "member", "wal")
|
|
|
- }
|
|
|
-
|
|
|
- if mem.Etcd.HeartbeatIntervalMs == 0 {
|
|
|
- return nil, fmt.Errorf("'--heartbeat-interval' cannot be 0 (got %+v)", mem.Etcd)
|
|
|
- }
|
|
|
- if mem.Etcd.ElectionTimeoutMs == 0 {
|
|
|
- return nil, fmt.Errorf("'--election-timeout' cannot be 0 (got %+v)", mem.Etcd)
|
|
|
- }
|
|
|
- if int64(clus.Tester.DelayLatencyMs) <= mem.Etcd.ElectionTimeoutMs {
|
|
|
- return nil, fmt.Errorf("delay latency %d ms must be greater than election timeout %d ms", clus.Tester.DelayLatencyMs, mem.Etcd.ElectionTimeoutMs)
|
|
|
- }
|
|
|
-
|
|
|
- port := ""
|
|
|
- listenClientPorts := make([]string, len(clus.Members))
|
|
|
- for i, u := range mem.Etcd.ListenClientURLs {
|
|
|
- if !isValidURL(u) {
|
|
|
- return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
|
|
|
- }
|
|
|
- listenClientPorts[i], err = getPort(u)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
|
|
|
- }
|
|
|
- }
|
|
|
- for i, u := range mem.Etcd.AdvertiseClientURLs {
|
|
|
- if !isValidURL(u) {
|
|
|
- return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
|
|
|
- }
|
|
|
- port, err = getPort(u)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
|
|
|
- }
|
|
|
- if mem.EtcdClientProxy && listenClientPorts[i] == port {
|
|
|
- return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- listenPeerPorts := make([]string, len(clus.Members))
|
|
|
- for i, u := range mem.Etcd.ListenPeerURLs {
|
|
|
- if !isValidURL(u) {
|
|
|
- return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
|
|
|
- }
|
|
|
- listenPeerPorts[i], err = getPort(u)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
|
|
|
- }
|
|
|
- }
|
|
|
- for j, u := range mem.Etcd.AdvertisePeerURLs {
|
|
|
- if !isValidURL(u) {
|
|
|
- return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
|
|
|
- }
|
|
|
- port, err = getPort(u)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
|
|
|
- }
|
|
|
- if mem.EtcdPeerProxy && listenPeerPorts[j] == port {
|
|
|
- return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[j])
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if !strings.HasPrefix(mem.EtcdLogPath, mem.BaseDir) {
|
|
|
- return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", mem.EtcdLogPath)
|
|
|
- }
|
|
|
- if !strings.HasPrefix(mem.Etcd.DataDir, mem.BaseDir) {
|
|
|
- return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", mem.Etcd.DataDir)
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: support separate WALDir that can be handled via failure-archive
|
|
|
- if !strings.HasPrefix(mem.Etcd.WALDir, mem.BaseDir) {
|
|
|
- return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", mem.Etcd.WALDir)
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: only support generated certs with TLS generator
|
|
|
- // deprecate auto TLS
|
|
|
- if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientCertAuth {
|
|
|
- return nil, fmt.Errorf("Etcd.ClientAutoTLS and Etcd.ClientCertAuth are both 'true'")
|
|
|
- }
|
|
|
- if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientCertFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientCertFile is %q", mem.Etcd.ClientCertFile)
|
|
|
- }
|
|
|
- if mem.Etcd.ClientCertAuth && mem.Etcd.ClientCertFile == "" {
|
|
|
- return nil, fmt.Errorf("Etcd.ClientCertAuth 'true', but Etcd.ClientCertFile is %q", mem.Etcd.PeerCertFile)
|
|
|
- }
|
|
|
- if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientKeyFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientKeyFile is %q", mem.Etcd.ClientKeyFile)
|
|
|
- }
|
|
|
- if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientTrustedCAFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientTrustedCAFile is %q", mem.Etcd.ClientTrustedCAFile)
|
|
|
- }
|
|
|
- if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerClientCertAuth {
|
|
|
- return nil, fmt.Errorf("Etcd.PeerAutoTLS and Etcd.PeerClientCertAuth are both 'true'")
|
|
|
- }
|
|
|
- if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerCertFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerCertFile is %q", mem.Etcd.PeerCertFile)
|
|
|
- }
|
|
|
- if mem.Etcd.PeerClientCertAuth && mem.Etcd.PeerCertFile == "" {
|
|
|
- return nil, fmt.Errorf("Etcd.PeerClientCertAuth 'true', but Etcd.PeerCertFile is %q", mem.Etcd.PeerCertFile)
|
|
|
- }
|
|
|
- if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerKeyFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerKeyFile is %q", mem.Etcd.PeerKeyFile)
|
|
|
- }
|
|
|
- if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerTrustedCAFile != "" {
|
|
|
- return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerTrustedCAFile is %q", mem.Etcd.PeerTrustedCAFile)
|
|
|
- }
|
|
|
-
|
|
|
- if mem.Etcd.ClientAutoTLS || mem.Etcd.ClientCertFile != "" {
|
|
|
- for _, cu := range mem.Etcd.ListenClientURLs {
|
|
|
- var u *url.URL
|
|
|
- u, err = url.Parse(cu)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if u.Scheme != "https" { // TODO: support unix
|
|
|
- return nil, fmt.Errorf("client TLS is enabled with wrong scheme %q", cu)
|
|
|
- }
|
|
|
- }
|
|
|
- for _, cu := range mem.Etcd.AdvertiseClientURLs {
|
|
|
- var u *url.URL
|
|
|
- u, err = url.Parse(cu)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if u.Scheme != "https" { // TODO: support unix
|
|
|
- return nil, fmt.Errorf("client TLS is enabled with wrong scheme %q", cu)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if mem.Etcd.PeerAutoTLS || mem.Etcd.PeerCertFile != "" {
|
|
|
- for _, cu := range mem.Etcd.ListenPeerURLs {
|
|
|
- var u *url.URL
|
|
|
- u, err = url.Parse(cu)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if u.Scheme != "https" { // TODO: support unix
|
|
|
- return nil, fmt.Errorf("peer TLS is enabled with wrong scheme %q", cu)
|
|
|
- }
|
|
|
- }
|
|
|
- for _, cu := range mem.Etcd.AdvertisePeerURLs {
|
|
|
- var u *url.URL
|
|
|
- u, err = url.Parse(cu)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if u.Scheme != "https" { // TODO: support unix
|
|
|
- return nil, fmt.Errorf("peer TLS is enabled with wrong scheme %q", cu)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if len(clus.Tester.FailureCases) == 0 {
|
|
|
- return nil, errors.New("FailureCases not found")
|
|
|
- }
|
|
|
- if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
|
|
|
- return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
|
|
|
- }
|
|
|
- if clus.Tester.UpdatedDelayLatencyMs == 0 {
|
|
|
- clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
|
|
|
- }
|
|
|
-
|
|
|
- for _, v := range clus.Tester.FailureCases {
|
|
|
- if _, ok := rpcpb.FailureCase_value[v]; !ok {
|
|
|
- return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for _, v := range clus.Tester.StressTypes {
|
|
|
- if _, ok := rpcpb.StressType_value[v]; !ok {
|
|
|
- return nil, fmt.Errorf("StressType is unknown; got %q", v)
|
|
|
- }
|
|
|
- }
|
|
|
- if clus.Tester.StressKeySuffixRangeTxn > 100 {
|
|
|
- return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
|
|
|
- }
|
|
|
- if clus.Tester.StressKeyTxnOps > 64 {
|
|
|
- return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
|
|
|
- }
|
|
|
-
|
|
|
- return clus, err
|
|
|
-}
|
|
|
-
|
|
|
var dialOpts = []grpc.DialOption{
|
|
|
grpc.WithInsecure(),
|
|
|
grpc.WithTimeout(5 * time.Second),
|
|
|
@@ -285,7 +71,7 @@ var dialOpts = []grpc.DialOption{
|
|
|
|
|
|
// NewCluster creates a client from a tester configuration.
|
|
|
func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
|
|
- clus, err := newCluster(lg, fpath)
|
|
|
+ clus, err := read(lg, fpath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -320,7 +106,7 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
|
|
}
|
|
|
}
|
|
|
clus.testerHTTPServer = &http.Server{
|
|
|
- Addr: clus.Tester.TesterAddr,
|
|
|
+ Addr: clus.Tester.Addr,
|
|
|
Handler: mux,
|
|
|
}
|
|
|
go clus.serveTesterServer()
|
|
|
@@ -340,12 +126,12 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
|
|
|
func (clus *Cluster) serveTesterServer() {
|
|
|
clus.lg.Info(
|
|
|
"started tester HTTP server",
|
|
|
- zap.String("tester-address", clus.Tester.TesterAddr),
|
|
|
+ zap.String("tester-address", clus.Tester.Addr),
|
|
|
)
|
|
|
err := clus.testerHTTPServer.ListenAndServe()
|
|
|
clus.lg.Info(
|
|
|
"tester HTTP server returned",
|
|
|
- zap.String("tester-address", clus.Tester.TesterAddr),
|
|
|
+ zap.String("tester-address", clus.Tester.Addr),
|
|
|
zap.Error(err),
|
|
|
)
|
|
|
if err != nil && err != http.ErrServerClosed {
|
|
|
@@ -356,70 +142,98 @@ func (clus *Cluster) serveTesterServer() {
|
|
|
func (clus *Cluster) updateFailures() {
|
|
|
for _, cs := range clus.Tester.FailureCases {
|
|
|
switch cs {
|
|
|
- case "KILL_ONE_FOLLOWER":
|
|
|
- clus.failures = append(clus.failures, newFailureKillOneFollower())
|
|
|
- case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
|
|
|
- case "KILL_LEADER":
|
|
|
- clus.failures = append(clus.failures, newFailureKillLeader())
|
|
|
- case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
|
|
|
- case "KILL_QUORUM":
|
|
|
- clus.failures = append(clus.failures, newFailureKillQuorum())
|
|
|
- case "KILL_ALL":
|
|
|
- clus.failures = append(clus.failures, newFailureKillAll())
|
|
|
+ case "SIGTERM_ONE_FOLLOWER":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_ONE_FOLLOWER(clus))
|
|
|
+ case "SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
|
|
|
+ case "SIGTERM_LEADER":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_LEADER(clus))
|
|
|
+ case "SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
|
|
|
+ case "SIGTERM_QUORUM":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_QUORUM(clus))
|
|
|
+ case "SIGTERM_ALL":
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_SIGTERM_ALL(clus))
|
|
|
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER(clus))
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT())
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER(clus))
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT())
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_QUORUM(clus))
|
|
|
case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
|
|
|
- clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_BLACKHOLE_PEER_PORT_TX_RX_ALL(clus))
|
|
|
|
|
|
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, true))
|
|
|
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
|
|
|
case "DELAY_PEER_PORT_TX_RX_LEADER":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER(clus, true))
|
|
|
case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
|
|
|
case "DELAY_PEER_PORT_TX_RX_QUORUM":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_QUORUM(clus, true))
|
|
|
case "DELAY_PEER_PORT_TX_RX_ALL":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ALL(clus, false))
|
|
|
case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
|
|
|
- clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_DELAY_PEER_PORT_TX_RX_ALL(clus, true))
|
|
|
|
|
|
case "NO_FAIL_WITH_STRESS":
|
|
|
- clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_NO_FAIL_WITH_STRESS(clus))
|
|
|
case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
|
|
|
- clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS(clus))
|
|
|
|
|
|
case "EXTERNAL":
|
|
|
- clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ new_FailureCase_EXTERNAL(clus.Tester.ExternalExecPath))
|
|
|
case "FAILPOINTS":
|
|
|
fpFailures, fperr := failpointFailures(clus)
|
|
|
if len(fpFailures) == 0 {
|
|
|
clus.lg.Info("no failpoints found!", zap.Error(fperr))
|
|
|
}
|
|
|
- clus.failures = append(clus.failures, fpFailures...)
|
|
|
+ clus.failures = append(clus.failures,
|
|
|
+ fpFailures...)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -444,48 +258,6 @@ func (clus *Cluster) UpdateDelayLatencyMs() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (clus *Cluster) shuffleFailures() {
|
|
|
- rand.Seed(time.Now().UnixNano())
|
|
|
- offset := rand.Intn(1000)
|
|
|
- n := len(clus.failures)
|
|
|
- cp := coprime(n)
|
|
|
-
|
|
|
- fs := make([]Failure, n)
|
|
|
- for i := 0; i < n; i++ {
|
|
|
- fs[i] = clus.failures[(cp*i+offset)%n]
|
|
|
- }
|
|
|
- clus.failures = fs
|
|
|
- clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
|
|
|
-}
|
|
|
-
|
|
|
-/*
|
|
|
-x and y of GCD 1 are coprime to each other
|
|
|
-
|
|
|
-x1 = ( coprime of n * idx1 + offset ) % n
|
|
|
-x2 = ( coprime of n * idx2 + offset ) % n
|
|
|
-(x2 - x1) = coprime of n * (idx2 - idx1) % n
|
|
|
- = (idx2 - idx1) = 1
|
|
|
-
|
|
|
-Consecutive x's are guaranteed to be distinct
|
|
|
-*/
|
|
|
-func coprime(n int) int {
|
|
|
- coprime := 1
|
|
|
- for i := n / 2; i < n; i++ {
|
|
|
- if gcd(i, n) == 1 {
|
|
|
- coprime = i
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- return coprime
|
|
|
-}
|
|
|
-
|
|
|
-func gcd(x, y int) int {
|
|
|
- if y == 0 {
|
|
|
- return x
|
|
|
- }
|
|
|
- return gcd(y, x%y)
|
|
|
-}
|
|
|
-
|
|
|
func (clus *Cluster) updateStresserChecker() {
|
|
|
cs := &compositeStresser{}
|
|
|
for _, m := range clus.Members {
|
|
|
@@ -502,11 +274,7 @@ func (clus *Cluster) updateStresserChecker() {
|
|
|
clus.checker = newNoChecker()
|
|
|
}
|
|
|
|
|
|
- clus.lg.Info(
|
|
|
- "updated stressers",
|
|
|
- zap.Int("round", clus.rd),
|
|
|
- zap.Int("case", clus.cs),
|
|
|
- )
|
|
|
+ clus.lg.Info("updated stressers")
|
|
|
}
|
|
|
|
|
|
func (clus *Cluster) checkConsistency() (err error) {
|
|
|
@@ -542,45 +310,74 @@ func (clus *Cluster) checkConsistency() (err error) {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
-// Bootstrap bootstraps etcd cluster the very first time.
|
|
|
+// Send_INITIAL_START_ETCD bootstraps etcd cluster the very first time.
|
|
|
// After this, just continue to call kill/restart.
|
|
|
-func (clus *Cluster) Bootstrap() error {
|
|
|
+func (clus *Cluster) Send_INITIAL_START_ETCD() error {
|
|
|
// this is the only time that creates request from scratch
|
|
|
- return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
|
|
|
+ return clus.broadcast(rpcpb.Operation_INITIAL_START_ETCD)
|
|
|
}
|
|
|
|
|
|
-// FailArchive sends "FailArchive" operation.
|
|
|
-func (clus *Cluster) FailArchive() error {
|
|
|
- return clus.broadcastOperation(rpcpb.Operation_FailArchive)
|
|
|
+// send_SIGQUIT_ETCD_AND_ARCHIVE_DATA sends "send_SIGQUIT_ETCD_AND_ARCHIVE_DATA" operation.
|
|
|
+func (clus *Cluster) send_SIGQUIT_ETCD_AND_ARCHIVE_DATA() error {
|
|
|
+ return clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA)
|
|
|
}
|
|
|
|
|
|
-// Restart sends "Restart" operation.
|
|
|
-func (clus *Cluster) Restart() error {
|
|
|
- return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
|
|
|
+// send_RESTART_ETCD sends restart operation.
|
|
|
+func (clus *Cluster) send_RESTART_ETCD() error {
|
|
|
+ return clus.broadcast(rpcpb.Operation_RESTART_ETCD)
|
|
|
}
|
|
|
|
|
|
-func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
|
|
|
+func (clus *Cluster) broadcast(op rpcpb.Operation) error {
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(len(clus.agentStreams))
|
|
|
+
|
|
|
+ errc := make(chan error, len(clus.agentStreams))
|
|
|
for i := range clus.agentStreams {
|
|
|
- err := clus.sendOperation(i, op)
|
|
|
+ go func(idx int, o rpcpb.Operation) {
|
|
|
+ defer wg.Done()
|
|
|
+ errc <- clus.sendOp(idx, o)
|
|
|
+ }(i, op)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ close(errc)
|
|
|
+
|
|
|
+ errs := []string{}
|
|
|
+ for err := range errc {
|
|
|
+ if err == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
if err != nil {
|
|
|
- if op == rpcpb.Operation_DestroyEtcdAgent &&
|
|
|
- strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
|
|
|
- // agent server has already closed;
|
|
|
- // so this error is expected
|
|
|
- clus.lg.Info(
|
|
|
- "successfully destroyed",
|
|
|
- zap.String("member", clus.Members[i].EtcdClientEndpoint),
|
|
|
- )
|
|
|
- continue
|
|
|
+ destroyed := false
|
|
|
+ if op == rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT {
|
|
|
+ if err == io.EOF {
|
|
|
+ destroyed = true
|
|
|
+ }
|
|
|
+ if strings.Contains(err.Error(),
|
|
|
+ "rpc error: code = Unavailable desc = transport is closing") {
|
|
|
+ // agent server has already closed;
|
|
|
+ // so this error is expected
|
|
|
+ destroyed = true
|
|
|
+ }
|
|
|
+ if strings.Contains(err.Error(),
|
|
|
+ "desc = os: process already finished") {
|
|
|
+ destroyed = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !destroyed {
|
|
|
+ errs = append(errs, err.Error())
|
|
|
}
|
|
|
- return err
|
|
|
}
|
|
|
}
|
|
|
- return nil
|
|
|
+
|
|
|
+ if len(errs) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return errors.New(strings.Join(errs, ", "))
|
|
|
}
|
|
|
|
|
|
-func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
|
|
- if op == rpcpb.Operation_InitialStartEtcd {
|
|
|
+func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
|
|
|
+ if op == rpcpb.Operation_INITIAL_START_ETCD {
|
|
|
clus.agentRequests[idx] = &rpcpb.Request{
|
|
|
Operation: op,
|
|
|
Member: clus.Members[idx],
|
|
|
@@ -639,9 +436,9 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
|
|
}
|
|
|
|
|
|
// store TLS assets from agents/servers onto disk
|
|
|
- if secure && (op == rpcpb.Operation_InitialStartEtcd || op == rpcpb.Operation_RestartEtcd) {
|
|
|
+ if secure && (op == rpcpb.Operation_INITIAL_START_ETCD || op == rpcpb.Operation_RESTART_ETCD) {
|
|
|
dirClient := filepath.Join(
|
|
|
- clus.Tester.TesterDataDir,
|
|
|
+ clus.Tester.DataDir,
|
|
|
clus.Members[idx].Etcd.Name,
|
|
|
"fixtures",
|
|
|
"client",
|
|
|
@@ -699,9 +496,9 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
|
|
|
-func (clus *Cluster) DestroyEtcdAgents() {
|
|
|
- err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
|
|
|
+// Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
|
|
|
+func (clus *Cluster) Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() {
|
|
|
+ err := clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT)
|
|
|
if err != nil {
|
|
|
clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
|
|
|
} else {
|
|
|
@@ -717,7 +514,7 @@ func (clus *Cluster) DestroyEtcdAgents() {
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
|
err := clus.testerHTTPServer.Shutdown(ctx)
|
|
|
cancel()
|
|
|
- clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
|
|
|
+ clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -886,6 +683,7 @@ func (clus *Cluster) defrag() error {
|
|
|
"defrag ALL PASS",
|
|
|
zap.Int("round", clus.rd),
|
|
|
zap.Int("case", clus.cs),
|
|
|
+ zap.Int("case-total", len(clus.failures)),
|
|
|
)
|
|
|
return nil
|
|
|
}
|