cluster.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net/http"
  22. "net/url"
  23. "path/filepath"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/pkg/debugutil"
  27. "github.com/coreos/etcd/pkg/fileutil"
  28. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  29. "github.com/prometheus/client_golang/prometheus/promhttp"
  30. "go.uber.org/zap"
  31. "golang.org/x/time/rate"
  32. "google.golang.org/grpc"
  33. yaml "gopkg.in/yaml.v2"
  34. )
  35. // Cluster defines tester cluster.
  36. type Cluster struct {
  37. lg *zap.Logger
  38. agentConns []*grpc.ClientConn
  39. agentClients []rpcpb.TransportClient
  40. agentStreams []rpcpb.Transport_TransportClient
  41. agentRequests []*rpcpb.Request
  42. testerHTTPServer *http.Server
  43. Members []*rpcpb.Member `yaml:"agent-configs"`
  44. Tester *rpcpb.Tester `yaml:"tester-config"`
  45. failures []Failure
  46. rateLimiter *rate.Limiter
  47. stresser Stresser
  48. checker Checker
  49. currentRevision int64
  50. rd int
  51. cs int
  52. }
  53. func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  54. bts, err := ioutil.ReadFile(fpath)
  55. if err != nil {
  56. return nil, err
  57. }
  58. lg.Info("opened configuration file", zap.String("path", fpath))
  59. clus := &Cluster{lg: lg}
  60. if err = yaml.Unmarshal(bts, clus); err != nil {
  61. return nil, err
  62. }
  63. for i, mem := range clus.Members {
  64. if mem.BaseDir == "" {
  65. return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", mem.BaseDir)
  66. }
  67. if mem.EtcdLogPath == "" {
  68. return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", mem.EtcdLogPath)
  69. }
  70. if mem.Etcd.Name == "" {
  71. return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", mem)
  72. }
  73. if mem.Etcd.DataDir == "" {
  74. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", mem)
  75. }
  76. if mem.Etcd.SnapshotCount == 0 {
  77. return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", mem.Etcd.SnapshotCount)
  78. }
  79. if mem.Etcd.DataDir == "" {
  80. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", mem.Etcd.DataDir)
  81. }
  82. if mem.Etcd.WALDir == "" {
  83. clus.Members[i].Etcd.WALDir = filepath.Join(mem.Etcd.DataDir, "member", "wal")
  84. }
  85. if mem.Etcd.HeartbeatIntervalMs == 0 {
  86. return nil, fmt.Errorf("'--heartbeat-interval' cannot be 0 (got %+v)", mem.Etcd)
  87. }
  88. if mem.Etcd.ElectionTimeoutMs == 0 {
  89. return nil, fmt.Errorf("'--election-timeout' cannot be 0 (got %+v)", mem.Etcd)
  90. }
  91. if int64(clus.Tester.DelayLatencyMs) <= mem.Etcd.ElectionTimeoutMs {
  92. return nil, fmt.Errorf("delay latency %d ms must be greater than election timeout %d ms", clus.Tester.DelayLatencyMs, mem.Etcd.ElectionTimeoutMs)
  93. }
  94. port := ""
  95. listenClientPorts := make([]string, len(clus.Members))
  96. for i, u := range mem.Etcd.ListenClientURLs {
  97. if !isValidURL(u) {
  98. return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
  99. }
  100. listenClientPorts[i], err = getPort(u)
  101. if err != nil {
  102. return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
  103. }
  104. }
  105. for i, u := range mem.Etcd.AdvertiseClientURLs {
  106. if !isValidURL(u) {
  107. return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
  108. }
  109. port, err = getPort(u)
  110. if err != nil {
  111. return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
  112. }
  113. if mem.EtcdClientProxy && listenClientPorts[i] == port {
  114. return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
  115. }
  116. }
  117. listenPeerPorts := make([]string, len(clus.Members))
  118. for i, u := range mem.Etcd.ListenPeerURLs {
  119. if !isValidURL(u) {
  120. return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
  121. }
  122. listenPeerPorts[i], err = getPort(u)
  123. if err != nil {
  124. return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
  125. }
  126. }
  127. for j, u := range mem.Etcd.AdvertisePeerURLs {
  128. if !isValidURL(u) {
  129. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
  130. }
  131. port, err = getPort(u)
  132. if err != nil {
  133. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
  134. }
  135. if mem.EtcdPeerProxy && listenPeerPorts[j] == port {
  136. return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[j])
  137. }
  138. }
  139. if !strings.HasPrefix(mem.EtcdLogPath, mem.BaseDir) {
  140. return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", mem.EtcdLogPath)
  141. }
  142. if !strings.HasPrefix(mem.Etcd.DataDir, mem.BaseDir) {
  143. return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", mem.Etcd.DataDir)
  144. }
  145. // TODO: support separate WALDir that can be handled via failure-archive
  146. if !strings.HasPrefix(mem.Etcd.WALDir, mem.BaseDir) {
  147. return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", mem.Etcd.WALDir)
  148. }
  149. // TODO: only support generated certs with TLS generator
  150. // deprecate auto TLS
  151. if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientCertAuth {
  152. return nil, fmt.Errorf("Etcd.ClientAutoTLS and Etcd.ClientCertAuth are both 'true'")
  153. }
  154. if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientCertFile != "" {
  155. return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientCertFile is %q", mem.Etcd.ClientCertFile)
  156. }
  157. if mem.Etcd.ClientCertAuth && mem.Etcd.ClientCertFile == "" {
  158. return nil, fmt.Errorf("Etcd.ClientCertAuth 'true', but Etcd.ClientCertFile is %q", mem.Etcd.PeerCertFile)
  159. }
  160. if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientKeyFile != "" {
  161. return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientKeyFile is %q", mem.Etcd.ClientKeyFile)
  162. }
  163. if mem.Etcd.ClientAutoTLS && mem.Etcd.ClientTrustedCAFile != "" {
  164. return nil, fmt.Errorf("Etcd.ClientAutoTLS 'true', but Etcd.ClientTrustedCAFile is %q", mem.Etcd.ClientTrustedCAFile)
  165. }
  166. if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerClientCertAuth {
  167. return nil, fmt.Errorf("Etcd.PeerAutoTLS and Etcd.PeerClientCertAuth are both 'true'")
  168. }
  169. if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerCertFile != "" {
  170. return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerCertFile is %q", mem.Etcd.PeerCertFile)
  171. }
  172. if mem.Etcd.PeerClientCertAuth && mem.Etcd.PeerCertFile == "" {
  173. return nil, fmt.Errorf("Etcd.PeerClientCertAuth 'true', but Etcd.PeerCertFile is %q", mem.Etcd.PeerCertFile)
  174. }
  175. if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerKeyFile != "" {
  176. return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerKeyFile is %q", mem.Etcd.PeerKeyFile)
  177. }
  178. if mem.Etcd.PeerAutoTLS && mem.Etcd.PeerTrustedCAFile != "" {
  179. return nil, fmt.Errorf("Etcd.PeerAutoTLS 'true', but Etcd.PeerTrustedCAFile is %q", mem.Etcd.PeerTrustedCAFile)
  180. }
  181. if mem.Etcd.ClientAutoTLS || mem.Etcd.ClientCertFile != "" {
  182. for _, cu := range mem.Etcd.ListenClientURLs {
  183. var u *url.URL
  184. u, err = url.Parse(cu)
  185. if err != nil {
  186. return nil, err
  187. }
  188. if u.Scheme != "https" { // TODO: support unix
  189. return nil, fmt.Errorf("client TLS is enabled with wrong scheme %q", cu)
  190. }
  191. }
  192. for _, cu := range mem.Etcd.AdvertiseClientURLs {
  193. var u *url.URL
  194. u, err = url.Parse(cu)
  195. if err != nil {
  196. return nil, err
  197. }
  198. if u.Scheme != "https" { // TODO: support unix
  199. return nil, fmt.Errorf("client TLS is enabled with wrong scheme %q", cu)
  200. }
  201. }
  202. }
  203. if mem.Etcd.PeerAutoTLS || mem.Etcd.PeerCertFile != "" {
  204. for _, cu := range mem.Etcd.ListenPeerURLs {
  205. var u *url.URL
  206. u, err = url.Parse(cu)
  207. if err != nil {
  208. return nil, err
  209. }
  210. if u.Scheme != "https" { // TODO: support unix
  211. return nil, fmt.Errorf("peer TLS is enabled with wrong scheme %q", cu)
  212. }
  213. }
  214. for _, cu := range mem.Etcd.AdvertisePeerURLs {
  215. var u *url.URL
  216. u, err = url.Parse(cu)
  217. if err != nil {
  218. return nil, err
  219. }
  220. if u.Scheme != "https" { // TODO: support unix
  221. return nil, fmt.Errorf("peer TLS is enabled with wrong scheme %q", cu)
  222. }
  223. }
  224. }
  225. }
  226. if len(clus.Tester.FailureCases) == 0 {
  227. return nil, errors.New("FailureCases not found")
  228. }
  229. if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
  230. return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
  231. }
  232. if clus.Tester.UpdatedDelayLatencyMs == 0 {
  233. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  234. }
  235. for _, v := range clus.Tester.FailureCases {
  236. if _, ok := rpcpb.FailureCase_value[v]; !ok {
  237. return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
  238. }
  239. }
  240. for _, v := range clus.Tester.StressTypes {
  241. if _, ok := rpcpb.StressType_value[v]; !ok {
  242. return nil, fmt.Errorf("StressType is unknown; got %q", v)
  243. }
  244. }
  245. if clus.Tester.StressKeySuffixRangeTxn > 100 {
  246. return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
  247. }
  248. if clus.Tester.StressKeyTxnOps > 64 {
  249. return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
  250. }
  251. return clus, err
  252. }
  253. var dialOpts = []grpc.DialOption{
  254. grpc.WithInsecure(),
  255. grpc.WithTimeout(5 * time.Second),
  256. grpc.WithBlock(),
  257. }
  258. // NewCluster creates a client from a tester configuration.
  259. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  260. clus, err := newCluster(lg, fpath)
  261. if err != nil {
  262. return nil, err
  263. }
  264. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  265. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  266. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  267. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  268. clus.failures = make([]Failure, 0)
  269. for i, ap := range clus.Members {
  270. var err error
  271. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  272. if err != nil {
  273. return nil, err
  274. }
  275. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  276. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  277. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  278. if err != nil {
  279. return nil, err
  280. }
  281. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  282. }
  283. mux := http.NewServeMux()
  284. mux.Handle("/metrics", promhttp.Handler())
  285. if clus.Tester.EnablePprof {
  286. for p, h := range debugutil.PProfHandlers() {
  287. mux.Handle(p, h)
  288. }
  289. }
  290. clus.testerHTTPServer = &http.Server{
  291. Addr: clus.Tester.TesterAddr,
  292. Handler: mux,
  293. }
  294. go clus.serveTesterServer()
  295. clus.updateFailures()
  296. clus.rateLimiter = rate.NewLimiter(
  297. rate.Limit(int(clus.Tester.StressQPS)),
  298. int(clus.Tester.StressQPS),
  299. )
  300. clus.updateStresserChecker()
  301. return clus, nil
  302. }
  303. func (clus *Cluster) serveTesterServer() {
  304. clus.lg.Info(
  305. "started tester HTTP server",
  306. zap.String("tester-address", clus.Tester.TesterAddr),
  307. )
  308. err := clus.testerHTTPServer.ListenAndServe()
  309. clus.lg.Info(
  310. "tester HTTP server returned",
  311. zap.String("tester-address", clus.Tester.TesterAddr),
  312. zap.Error(err),
  313. )
  314. if err != nil && err != http.ErrServerClosed {
  315. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  316. }
  317. }
  318. func (clus *Cluster) updateFailures() {
  319. for _, cs := range clus.Tester.FailureCases {
  320. switch cs {
  321. case "KILL_ONE_FOLLOWER":
  322. clus.failures = append(clus.failures, newFailureKillOneFollower())
  323. case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  324. clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
  325. case "KILL_LEADER":
  326. clus.failures = append(clus.failures, newFailureKillLeader())
  327. case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  328. clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
  329. case "KILL_QUORUM":
  330. clus.failures = append(clus.failures, newFailureKillQuorum())
  331. case "KILL_ALL":
  332. clus.failures = append(clus.failures, newFailureKillAll())
  333. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  334. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
  335. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  336. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
  337. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  338. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
  339. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  340. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
  341. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  342. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
  343. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  344. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
  345. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  346. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
  347. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  348. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
  349. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  350. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
  351. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  352. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
  353. case "DELAY_PEER_PORT_TX_RX_LEADER":
  354. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
  355. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  356. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
  357. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  358. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
  359. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  360. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
  361. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  362. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
  363. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  364. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
  365. case "DELAY_PEER_PORT_TX_RX_ALL":
  366. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
  367. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  368. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))
  369. case "NO_FAIL_WITH_STRESS":
  370. clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
  371. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  372. clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
  373. case "EXTERNAL":
  374. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  375. case "FAILPOINTS":
  376. fpFailures, fperr := failpointFailures(clus)
  377. if len(fpFailures) == 0 {
  378. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  379. }
  380. clus.failures = append(clus.failures, fpFailures...)
  381. }
  382. }
  383. }
  384. func (clus *Cluster) failureStrings() (fs []string) {
  385. fs = make([]string, len(clus.failures))
  386. for i := range clus.failures {
  387. fs[i] = clus.failures[i].Desc()
  388. }
  389. return fs
  390. }
  391. // UpdateDelayLatencyMs updates delay latency with random value
  392. // within election timeout.
  393. func (clus *Cluster) UpdateDelayLatencyMs() {
  394. rand.Seed(time.Now().UnixNano())
  395. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  396. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  397. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  398. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  399. }
  400. }
  401. func (clus *Cluster) shuffleFailures() {
  402. rand.Seed(time.Now().UnixNano())
  403. offset := rand.Intn(1000)
  404. n := len(clus.failures)
  405. cp := coprime(n)
  406. fs := make([]Failure, n)
  407. for i := 0; i < n; i++ {
  408. fs[i] = clus.failures[(cp*i+offset)%n]
  409. }
  410. clus.failures = fs
  411. clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
  412. }
  413. /*
  414. x and y of GCD 1 are coprime to each other
  415. x1 = ( coprime of n * idx1 + offset ) % n
  416. x2 = ( coprime of n * idx2 + offset ) % n
  417. (x2 - x1) = coprime of n * (idx2 - idx1) % n
  418. = (idx2 - idx1) = 1
  419. Consecutive x's are guaranteed to be distinct
  420. */
  421. func coprime(n int) int {
  422. coprime := 1
  423. for i := n / 2; i < n; i++ {
  424. if gcd(i, n) == 1 {
  425. coprime = i
  426. break
  427. }
  428. }
  429. return coprime
  430. }
  431. func gcd(x, y int) int {
  432. if y == 0 {
  433. return x
  434. }
  435. return gcd(y, x%y)
  436. }
  437. func (clus *Cluster) updateStresserChecker() {
  438. cs := &compositeStresser{}
  439. for _, m := range clus.Members {
  440. cs.stressers = append(cs.stressers, newStresser(clus, m))
  441. }
  442. clus.stresser = cs
  443. if clus.Tester.ConsistencyCheck {
  444. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  445. if schk := cs.Checker(); schk != nil {
  446. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  447. }
  448. } else {
  449. clus.checker = newNoChecker()
  450. }
  451. clus.lg.Info(
  452. "updated stressers",
  453. zap.Int("round", clus.rd),
  454. zap.Int("case", clus.cs),
  455. )
  456. }
  457. func (clus *Cluster) checkConsistency() (err error) {
  458. defer func() {
  459. if err != nil {
  460. return
  461. }
  462. if err = clus.updateRevision(); err != nil {
  463. clus.lg.Warn(
  464. "updateRevision failed",
  465. zap.Error(err),
  466. )
  467. return
  468. }
  469. }()
  470. if err = clus.checker.Check(); err != nil {
  471. clus.lg.Warn(
  472. "consistency check FAIL",
  473. zap.Int("round", clus.rd),
  474. zap.Int("case", clus.cs),
  475. zap.Error(err),
  476. )
  477. return err
  478. }
  479. clus.lg.Info(
  480. "consistency check ALL PASS",
  481. zap.Int("round", clus.rd),
  482. zap.Int("case", clus.cs),
  483. zap.String("desc", clus.failures[clus.cs].Desc()),
  484. )
  485. return err
  486. }
  487. // Bootstrap bootstraps etcd cluster the very first time.
  488. // After this, just continue to call kill/restart.
  489. func (clus *Cluster) Bootstrap() error {
  490. // this is the only time that creates request from scratch
  491. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  492. }
  493. // FailArchive sends "FailArchive" operation.
  494. func (clus *Cluster) FailArchive() error {
  495. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  496. }
  497. // Restart sends "Restart" operation.
  498. func (clus *Cluster) Restart() error {
  499. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  500. }
  501. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  502. for i := range clus.agentStreams {
  503. err := clus.sendOperation(i, op)
  504. if err != nil {
  505. if op == rpcpb.Operation_DestroyEtcdAgent &&
  506. strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
  507. // agent server has already closed;
  508. // so this error is expected
  509. clus.lg.Info(
  510. "successfully destroyed",
  511. zap.String("member", clus.Members[i].EtcdClientEndpoint),
  512. )
  513. continue
  514. }
  515. return err
  516. }
  517. }
  518. return nil
  519. }
  520. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  521. if op == rpcpb.Operation_InitialStartEtcd {
  522. clus.agentRequests[idx] = &rpcpb.Request{
  523. Operation: op,
  524. Member: clus.Members[idx],
  525. Tester: clus.Tester,
  526. }
  527. } else {
  528. clus.agentRequests[idx].Operation = op
  529. }
  530. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  531. clus.lg.Info(
  532. "sent request",
  533. zap.String("operation", op.String()),
  534. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  535. zap.Error(err),
  536. )
  537. if err != nil {
  538. return err
  539. }
  540. resp, err := clus.agentStreams[idx].Recv()
  541. if resp != nil {
  542. clus.lg.Info(
  543. "received response",
  544. zap.String("operation", op.String()),
  545. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  546. zap.Bool("success", resp.Success),
  547. zap.String("status", resp.Status),
  548. zap.Error(err),
  549. )
  550. } else {
  551. clus.lg.Info(
  552. "received empty response",
  553. zap.String("operation", op.String()),
  554. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  555. zap.Error(err),
  556. )
  557. }
  558. if err != nil {
  559. return err
  560. }
  561. if !resp.Success {
  562. return errors.New(resp.Status)
  563. }
  564. m, secure := clus.Members[idx], false
  565. for _, cu := range m.Etcd.AdvertiseClientURLs {
  566. u, err := url.Parse(cu)
  567. if err != nil {
  568. return err
  569. }
  570. if u.Scheme == "https" { // TODO: handle unix
  571. secure = true
  572. }
  573. }
  574. // store TLS assets from agents/servers onto disk
  575. if secure && (op == rpcpb.Operation_InitialStartEtcd || op == rpcpb.Operation_RestartEtcd) {
  576. dirClient := filepath.Join(
  577. clus.Tester.TesterDataDir,
  578. clus.Members[idx].Etcd.Name,
  579. "fixtures",
  580. "client",
  581. )
  582. if err = fileutil.TouchDirAll(dirClient); err != nil {
  583. return err
  584. }
  585. clientCertData := []byte(resp.Member.ClientCertData)
  586. if len(clientCertData) == 0 {
  587. return fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
  588. }
  589. clientCertPath := filepath.Join(dirClient, "cert.pem")
  590. if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
  591. return err
  592. }
  593. resp.Member.ClientCertPath = clientCertPath
  594. clus.lg.Info(
  595. "saved client cert file",
  596. zap.String("path", clientCertPath),
  597. )
  598. clientKeyData := []byte(resp.Member.ClientKeyData)
  599. if len(clientKeyData) == 0 {
  600. return fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
  601. }
  602. clientKeyPath := filepath.Join(dirClient, "key.pem")
  603. if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
  604. return err
  605. }
  606. resp.Member.ClientKeyPath = clientKeyPath
  607. clus.lg.Info(
  608. "saved client key file",
  609. zap.String("path", clientKeyPath),
  610. )
  611. clientTrustedCAData := []byte(resp.Member.ClientTrustedCAData)
  612. if len(clientTrustedCAData) != 0 {
  613. // TODO: disable this when auto TLS is deprecated
  614. clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
  615. if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
  616. return err
  617. }
  618. resp.Member.ClientTrustedCAPath = clientTrustedCAPath
  619. clus.lg.Info(
  620. "saved client trusted CA file",
  621. zap.String("path", clientTrustedCAPath),
  622. )
  623. }
  624. // no need to store peer certs for tester clients
  625. clus.Members[idx] = resp.Member
  626. }
  627. return nil
  628. }
  629. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  630. func (clus *Cluster) DestroyEtcdAgents() {
  631. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  632. if err != nil {
  633. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  634. } else {
  635. clus.lg.Info("destroying etcd/agents PASS")
  636. }
  637. for i, conn := range clus.agentConns {
  638. err := conn.Close()
  639. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  640. }
  641. if clus.testerHTTPServer != nil {
  642. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  643. err := clus.testerHTTPServer.Shutdown(ctx)
  644. cancel()
  645. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
  646. }
  647. }
  648. // WaitHealth ensures all members are healthy
  649. // by writing a test key to etcd cluster.
  650. func (clus *Cluster) WaitHealth() error {
  651. var err error
  652. // wait 60s to check cluster health.
  653. // TODO: set it to a reasonable value. It is set that high because
  654. // follower may use long time to catch up the leader when reboot under
  655. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  656. for i := 0; i < 60; i++ {
  657. for _, m := range clus.Members {
  658. if err = m.WriteHealthKey(); err != nil {
  659. clus.lg.Warn(
  660. "health check FAIL",
  661. zap.Int("retries", i),
  662. zap.String("endpoint", m.EtcdClientEndpoint),
  663. zap.Error(err),
  664. )
  665. break
  666. }
  667. clus.lg.Info(
  668. "health check PASS",
  669. zap.Int("retries", i),
  670. zap.String("endpoint", m.EtcdClientEndpoint),
  671. )
  672. }
  673. if err == nil {
  674. clus.lg.Info(
  675. "health check ALL PASS",
  676. zap.Int("round", clus.rd),
  677. zap.Int("case", clus.cs),
  678. )
  679. return nil
  680. }
  681. time.Sleep(time.Second)
  682. }
  683. return err
  684. }
  685. // GetLeader returns the index of leader and error if any.
  686. func (clus *Cluster) GetLeader() (int, error) {
  687. for i, m := range clus.Members {
  688. isLeader, err := m.IsLeader()
  689. if isLeader || err != nil {
  690. return i, err
  691. }
  692. }
  693. return 0, fmt.Errorf("no leader found")
  694. }
  695. // maxRev returns the maximum revision found on the cluster.
  696. func (clus *Cluster) maxRev() (rev int64, err error) {
  697. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  698. defer cancel()
  699. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  700. for i := range clus.Members {
  701. go func(m *rpcpb.Member) {
  702. mrev, merr := m.Rev(ctx)
  703. revc <- mrev
  704. errc <- merr
  705. }(clus.Members[i])
  706. }
  707. for i := 0; i < len(clus.Members); i++ {
  708. if merr := <-errc; merr != nil {
  709. err = merr
  710. }
  711. if mrev := <-revc; mrev > rev {
  712. rev = mrev
  713. }
  714. }
  715. return rev, err
  716. }
  717. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  718. revs := make(map[string]int64)
  719. hashes := make(map[string]int64)
  720. for _, m := range clus.Members {
  721. rev, hash, err := m.RevHash()
  722. if err != nil {
  723. return nil, nil, err
  724. }
  725. revs[m.EtcdClientEndpoint] = rev
  726. hashes[m.EtcdClientEndpoint] = hash
  727. }
  728. return revs, hashes, nil
  729. }
  730. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  731. if rev <= 0 {
  732. return nil
  733. }
  734. for i, m := range clus.Members {
  735. clus.lg.Info(
  736. "compact START",
  737. zap.String("endpoint", m.EtcdClientEndpoint),
  738. zap.Int64("compact-revision", rev),
  739. zap.Duration("timeout", timeout),
  740. )
  741. now := time.Now()
  742. cerr := m.Compact(rev, timeout)
  743. succeed := true
  744. if cerr != nil {
  745. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  746. clus.lg.Info(
  747. "compact error is ignored",
  748. zap.String("endpoint", m.EtcdClientEndpoint),
  749. zap.Int64("compact-revision", rev),
  750. zap.Error(cerr),
  751. )
  752. } else {
  753. clus.lg.Warn(
  754. "compact FAIL",
  755. zap.String("endpoint", m.EtcdClientEndpoint),
  756. zap.Int64("compact-revision", rev),
  757. zap.Error(cerr),
  758. )
  759. err = cerr
  760. succeed = false
  761. }
  762. }
  763. if succeed {
  764. clus.lg.Info(
  765. "compact PASS",
  766. zap.String("endpoint", m.EtcdClientEndpoint),
  767. zap.Int64("compact-revision", rev),
  768. zap.Duration("timeout", timeout),
  769. zap.Duration("took", time.Since(now)),
  770. )
  771. }
  772. }
  773. return err
  774. }
  775. func (clus *Cluster) checkCompact(rev int64) error {
  776. if rev == 0 {
  777. return nil
  778. }
  779. for _, m := range clus.Members {
  780. if err := m.CheckCompact(rev); err != nil {
  781. return err
  782. }
  783. }
  784. return nil
  785. }
  786. func (clus *Cluster) defrag() error {
  787. for _, m := range clus.Members {
  788. if err := m.Defrag(); err != nil {
  789. clus.lg.Warn(
  790. "defrag FAIL",
  791. zap.String("endpoint", m.EtcdClientEndpoint),
  792. zap.Error(err),
  793. )
  794. return err
  795. }
  796. clus.lg.Info(
  797. "defrag PASS",
  798. zap.String("endpoint", m.EtcdClientEndpoint),
  799. )
  800. }
  801. clus.lg.Info(
  802. "defrag ALL PASS",
  803. zap.Int("round", clus.rd),
  804. zap.Int("case", clus.cs),
  805. )
  806. return nil
  807. }
  808. // GetFailureDelayDuration computes failure delay duration.
  809. func (clus *Cluster) GetFailureDelayDuration() time.Duration {
  810. return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
  811. }
  812. // Report reports the number of modified keys.
  813. func (clus *Cluster) Report() int64 {
  814. return clus.stresser.ModifiedKeys()
  815. }