cluster.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net/http"
  22. "path/filepath"
  23. "strings"
  24. "time"
  25. "github.com/coreos/etcd/pkg/debugutil"
  26. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  27. "github.com/prometheus/client_golang/prometheus/promhttp"
  28. "go.uber.org/zap"
  29. "golang.org/x/time/rate"
  30. "google.golang.org/grpc"
  31. yaml "gopkg.in/yaml.v2"
  32. )
  33. // Cluster defines tester cluster.
  34. type Cluster struct {
  35. lg *zap.Logger
  36. agentConns []*grpc.ClientConn
  37. agentClients []rpcpb.TransportClient
  38. agentStreams []rpcpb.Transport_TransportClient
  39. agentRequests []*rpcpb.Request
  40. testerHTTPServer *http.Server
  41. Members []*rpcpb.Member `yaml:"agent-configs"`
  42. Tester *rpcpb.Tester `yaml:"tester-config"`
  43. failures []Failure
  44. rateLimiter *rate.Limiter
  45. stresser Stresser
  46. checker Checker
  47. currentRevision int64
  48. rd int
  49. cs int
  50. }
  51. func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  52. lg.Info("reading configuration file", zap.String("path", fpath))
  53. bts, err := ioutil.ReadFile(fpath)
  54. if err != nil {
  55. return nil, err
  56. }
  57. lg.Info("opened configuration file", zap.String("path", fpath))
  58. clus := &Cluster{lg: lg}
  59. if err = yaml.Unmarshal(bts, clus); err != nil {
  60. return nil, err
  61. }
  62. for i := range clus.Members {
  63. if clus.Members[i].BaseDir == "" {
  64. return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", clus.Members[i].BaseDir)
  65. }
  66. if clus.Members[i].EtcdLogPath == "" {
  67. return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", clus.Members[i].EtcdLogPath)
  68. }
  69. if clus.Members[i].Etcd.Name == "" {
  70. return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", clus.Members[i])
  71. }
  72. if clus.Members[i].Etcd.DataDir == "" {
  73. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", clus.Members[i])
  74. }
  75. if clus.Members[i].Etcd.SnapshotCount == 0 {
  76. return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", clus.Members[i].Etcd.SnapshotCount)
  77. }
  78. if clus.Members[i].Etcd.DataDir == "" {
  79. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", clus.Members[i].Etcd.DataDir)
  80. }
  81. if clus.Members[i].Etcd.WALDir == "" {
  82. clus.Members[i].Etcd.WALDir = filepath.Join(clus.Members[i].Etcd.DataDir, "member", "wal")
  83. }
  84. port := ""
  85. listenClientPorts := make([]string, len(clus.Members))
  86. for i, u := range clus.Members[i].Etcd.ListenClientURLs {
  87. if !isValidURL(u) {
  88. return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
  89. }
  90. listenClientPorts[i], err = getPort(u)
  91. if err != nil {
  92. return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
  93. }
  94. }
  95. for i, u := range clus.Members[i].Etcd.AdvertiseClientURLs {
  96. if !isValidURL(u) {
  97. return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
  98. }
  99. port, err = getPort(u)
  100. if err != nil {
  101. return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
  102. }
  103. if clus.Members[i].EtcdClientProxy && listenClientPorts[i] == port {
  104. return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
  105. }
  106. }
  107. listenPeerPorts := make([]string, len(clus.Members))
  108. for i, u := range clus.Members[i].Etcd.ListenPeerURLs {
  109. if !isValidURL(u) {
  110. return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
  111. }
  112. listenPeerPorts[i], err = getPort(u)
  113. if err != nil {
  114. return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
  115. }
  116. }
  117. for i, u := range clus.Members[i].Etcd.InitialAdvertisePeerURLs {
  118. if !isValidURL(u) {
  119. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
  120. }
  121. port, err = getPort(u)
  122. if err != nil {
  123. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
  124. }
  125. if clus.Members[i].EtcdPeerProxy && listenPeerPorts[i] == port {
  126. return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[i])
  127. }
  128. }
  129. if !strings.HasPrefix(clus.Members[i].EtcdLogPath, clus.Members[i].BaseDir) {
  130. return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", clus.Members[i].EtcdLogPath)
  131. }
  132. if !strings.HasPrefix(clus.Members[i].Etcd.DataDir, clus.Members[i].BaseDir) {
  133. return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.DataDir)
  134. }
  135. // TODO: support separate WALDir that can be handled via failure-archive
  136. if !strings.HasPrefix(clus.Members[i].Etcd.WALDir, clus.Members[i].BaseDir) {
  137. return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.WALDir)
  138. }
  139. if len(clus.Tester.FailureCases) == 0 {
  140. return nil, errors.New("FailureCases not found")
  141. }
  142. }
  143. for _, v := range clus.Tester.FailureCases {
  144. if _, ok := rpcpb.FailureCase_value[v]; !ok {
  145. return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
  146. }
  147. }
  148. for _, v := range clus.Tester.StressTypes {
  149. if _, ok := rpcpb.StressType_value[v]; !ok {
  150. return nil, fmt.Errorf("StressType is unknown; got %q", v)
  151. }
  152. }
  153. if clus.Tester.StressKeySuffixRangeTxn > 100 {
  154. return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
  155. }
  156. if clus.Tester.StressKeyTxnOps > 64 {
  157. return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
  158. }
  159. return clus, err
  160. }
  161. // TODO: status handler
  162. var dialOpts = []grpc.DialOption{
  163. grpc.WithInsecure(),
  164. grpc.WithTimeout(5 * time.Second),
  165. grpc.WithBlock(),
  166. }
  167. // NewCluster creates a client from a tester configuration.
  168. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  169. clus, err := newCluster(lg, fpath)
  170. if err != nil {
  171. return nil, err
  172. }
  173. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  174. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  175. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  176. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  177. clus.failures = make([]Failure, 0)
  178. for i, ap := range clus.Members {
  179. clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr))
  180. var err error
  181. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  182. if err != nil {
  183. return nil, err
  184. }
  185. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  186. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  187. clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
  188. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  189. if err != nil {
  190. return nil, err
  191. }
  192. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  193. }
  194. mux := http.NewServeMux()
  195. mux.Handle("/metrics", promhttp.Handler())
  196. if clus.Tester.EnablePprof {
  197. for p, h := range debugutil.PProfHandlers() {
  198. mux.Handle(p, h)
  199. }
  200. }
  201. clus.testerHTTPServer = &http.Server{
  202. Addr: clus.Tester.TesterAddr,
  203. Handler: mux,
  204. }
  205. go clus.serveTesterServer()
  206. clus.updateFailures()
  207. clus.rateLimiter = rate.NewLimiter(
  208. rate.Limit(int(clus.Tester.StressQPS)),
  209. int(clus.Tester.StressQPS),
  210. )
  211. clus.updateStresserChecker()
  212. return clus, nil
  213. }
  214. func (clus *Cluster) serveTesterServer() {
  215. clus.lg.Info(
  216. "started tester HTTP server",
  217. zap.String("tester-address", clus.Tester.TesterAddr),
  218. )
  219. err := clus.testerHTTPServer.ListenAndServe()
  220. clus.lg.Info(
  221. "tester HTTP server returned",
  222. zap.String("tester-address", clus.Tester.TesterAddr),
  223. zap.Error(err),
  224. )
  225. if err != nil && err != http.ErrServerClosed {
  226. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  227. }
  228. }
  229. func (clus *Cluster) updateFailures() {
  230. for _, cs := range clus.Tester.FailureCases {
  231. switch cs {
  232. case "KILL_ONE_FOLLOWER":
  233. clus.failures = append(clus.failures, newFailureKillOneFollower())
  234. case "KILL_LEADER":
  235. clus.failures = append(clus.failures, newFailureKillLeader())
  236. case "KILL_ONE_FOLLOWER_FOR_LONG":
  237. clus.failures = append(clus.failures, newFailureKillOneFollowerForLongTime())
  238. case "KILL_LEADER_FOR_LONG":
  239. clus.failures = append(clus.failures, newFailureKillLeaderForLongTime())
  240. case "KILL_QUORUM":
  241. clus.failures = append(clus.failures, newFailureKillQuorum())
  242. case "KILL_ALL":
  243. clus.failures = append(clus.failures, newFailureKillAll())
  244. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  245. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
  246. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  247. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
  248. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  249. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
  250. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  251. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus))
  252. case "DELAY_PEER_PORT_TX_RX_LEADER":
  253. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus))
  254. case "DELAY_PEER_PORT_TX_RX_ALL":
  255. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus))
  256. case "FAILPOINTS":
  257. fpFailures, fperr := failpointFailures(clus)
  258. if len(fpFailures) == 0 {
  259. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  260. }
  261. clus.failures = append(clus.failures, fpFailures...)
  262. case "NO_FAIL_WITH_STRESS":
  263. clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
  264. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  265. clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
  266. case "EXTERNAL":
  267. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  268. }
  269. }
  270. }
  271. func (clus *Cluster) failureStrings() (fs []string) {
  272. fs = make([]string, len(clus.failures))
  273. for i := range clus.failures {
  274. fs[i] = clus.failures[i].Desc()
  275. }
  276. return fs
  277. }
  278. func (clus *Cluster) shuffleFailures() {
  279. rand.Seed(time.Now().UnixNano())
  280. offset := rand.Intn(1000)
  281. n := len(clus.failures)
  282. cp := coprime(n)
  283. clus.lg.Info("shuffling test failure cases", zap.Int("total", n))
  284. fs := make([]Failure, n)
  285. for i := 0; i < n; i++ {
  286. fs[i] = clus.failures[(cp*i+offset)%n]
  287. }
  288. clus.failures = fs
  289. clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
  290. }
  291. /*
  292. x and y of GCD 1 are coprime to each other
  293. x1 = ( coprime of n * idx1 + offset ) % n
  294. x2 = ( coprime of n * idx2 + offset ) % n
  295. (x2 - x1) = coprime of n * (idx2 - idx1) % n
  296. = (idx2 - idx1) = 1
  297. Consecutive x's are guaranteed to be distinct
  298. */
  299. func coprime(n int) int {
  300. coprime := 1
  301. for i := n / 2; i < n; i++ {
  302. if gcd(i, n) == 1 {
  303. coprime = i
  304. break
  305. }
  306. }
  307. return coprime
  308. }
  309. func gcd(x, y int) int {
  310. if y == 0 {
  311. return x
  312. }
  313. return gcd(y, x%y)
  314. }
  315. func (clus *Cluster) updateStresserChecker() {
  316. clus.lg.Info(
  317. "updating stressers",
  318. zap.Int("round", clus.rd),
  319. zap.Int("case", clus.cs),
  320. )
  321. cs := &compositeStresser{}
  322. for _, m := range clus.Members {
  323. cs.stressers = append(cs.stressers, newStresser(clus, m))
  324. }
  325. clus.stresser = cs
  326. if clus.Tester.ConsistencyCheck {
  327. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  328. if schk := cs.Checker(); schk != nil {
  329. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  330. }
  331. } else {
  332. clus.checker = newNoChecker()
  333. }
  334. clus.lg.Info(
  335. "updated stressers",
  336. zap.Int("round", clus.rd),
  337. zap.Int("case", clus.cs),
  338. )
  339. }
  340. func (clus *Cluster) startStresser() (err error) {
  341. clus.lg.Info(
  342. "starting stressers",
  343. zap.Int("round", clus.rd),
  344. zap.Int("case", clus.cs),
  345. )
  346. err = clus.stresser.Stress()
  347. clus.lg.Info(
  348. "started stressers",
  349. zap.Int("round", clus.rd),
  350. zap.Int("case", clus.cs),
  351. )
  352. return err
  353. }
  354. func (clus *Cluster) closeStresser() {
  355. clus.lg.Info(
  356. "closing stressers",
  357. zap.Int("round", clus.rd),
  358. zap.Int("case", clus.cs),
  359. )
  360. clus.stresser.Close()
  361. clus.lg.Info(
  362. "closed stressers",
  363. zap.Int("round", clus.rd),
  364. zap.Int("case", clus.cs),
  365. )
  366. }
  367. func (clus *Cluster) pauseStresser() {
  368. clus.lg.Info(
  369. "pausing stressers",
  370. zap.Int("round", clus.rd),
  371. zap.Int("case", clus.cs),
  372. )
  373. clus.stresser.Pause()
  374. clus.lg.Info(
  375. "paused stressers",
  376. zap.Int("round", clus.rd),
  377. zap.Int("case", clus.cs),
  378. )
  379. }
  380. func (clus *Cluster) checkConsistency() (err error) {
  381. defer func() {
  382. if err != nil {
  383. return
  384. }
  385. if err = clus.updateRevision(); err != nil {
  386. clus.lg.Warn(
  387. "updateRevision failed",
  388. zap.Error(err),
  389. )
  390. return
  391. }
  392. err = clus.startStresser()
  393. }()
  394. clus.lg.Info(
  395. "checking consistency and invariant of cluster",
  396. zap.Int("round", clus.rd),
  397. zap.Int("case", clus.cs),
  398. zap.String("desc", clus.failures[clus.cs].Desc()),
  399. )
  400. if err = clus.checker.Check(); err != nil {
  401. clus.lg.Warn(
  402. "checker.Check failed",
  403. zap.Error(err),
  404. )
  405. return err
  406. }
  407. clus.lg.Info(
  408. "checked consistency and invariant of cluster",
  409. zap.Int("round", clus.rd),
  410. zap.Int("case", clus.cs),
  411. zap.String("desc", clus.failures[clus.cs].Desc()),
  412. )
  413. return err
  414. }
  415. // Bootstrap bootstraps etcd cluster the very first time.
  416. // After this, just continue to call kill/restart.
  417. func (clus *Cluster) Bootstrap() error {
  418. // this is the only time that creates request from scratch
  419. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  420. }
  421. // FailArchive sends "FailArchive" operation.
  422. func (clus *Cluster) FailArchive() error {
  423. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  424. }
  425. // Restart sends "Restart" operation.
  426. func (clus *Cluster) Restart() error {
  427. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  428. }
  429. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  430. for i := range clus.agentStreams {
  431. err := clus.sendOperation(i, op)
  432. if err != nil {
  433. if op == rpcpb.Operation_DestroyEtcdAgent &&
  434. strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
  435. // agent server has already closed;
  436. // so this error is expected
  437. clus.lg.Info(
  438. "successfully destroyed",
  439. zap.String("member", clus.Members[i].EtcdClientEndpoint),
  440. )
  441. continue
  442. }
  443. return err
  444. }
  445. }
  446. return nil
  447. }
  448. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  449. if op == rpcpb.Operation_InitialStartEtcd {
  450. clus.agentRequests[idx] = &rpcpb.Request{
  451. Operation: op,
  452. Member: clus.Members[idx],
  453. Tester: clus.Tester,
  454. }
  455. } else {
  456. clus.agentRequests[idx].Operation = op
  457. }
  458. clus.lg.Info(
  459. "sending request",
  460. zap.String("operation", op.String()),
  461. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  462. )
  463. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  464. clus.lg.Info(
  465. "sent request",
  466. zap.String("operation", op.String()),
  467. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  468. zap.Error(err),
  469. )
  470. if err != nil {
  471. return err
  472. }
  473. clus.lg.Info(
  474. "receiving response",
  475. zap.String("operation", op.String()),
  476. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  477. )
  478. resp, err := clus.agentStreams[idx].Recv()
  479. if resp != nil {
  480. clus.lg.Info(
  481. "received response",
  482. zap.String("operation", op.String()),
  483. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  484. zap.Bool("success", resp.Success),
  485. zap.String("status", resp.Status),
  486. zap.Error(err),
  487. )
  488. } else {
  489. clus.lg.Info(
  490. "received empty response",
  491. zap.String("operation", op.String()),
  492. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  493. zap.Error(err),
  494. )
  495. }
  496. if err != nil {
  497. return err
  498. }
  499. if !resp.Success {
  500. err = errors.New(resp.Status)
  501. }
  502. return err
  503. }
  504. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  505. func (clus *Cluster) DestroyEtcdAgents() {
  506. clus.lg.Info("destroying etcd servers and agents")
  507. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  508. if err != nil {
  509. clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err))
  510. } else {
  511. clus.lg.Info("destroyed etcd servers and agents")
  512. }
  513. for i, conn := range clus.agentConns {
  514. clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
  515. err := conn.Close()
  516. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  517. }
  518. if clus.testerHTTPServer != nil {
  519. clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
  520. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  521. err := clus.testerHTTPServer.Shutdown(ctx)
  522. cancel()
  523. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
  524. }
  525. }
  526. // WaitHealth ensures all members are healthy
  527. // by writing a test key to etcd cluster.
  528. func (clus *Cluster) WaitHealth() error {
  529. var err error
  530. // wait 60s to check cluster health.
  531. // TODO: set it to a reasonable value. It is set that high because
  532. // follower may use long time to catch up the leader when reboot under
  533. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  534. for i := 0; i < 60; i++ {
  535. for _, m := range clus.Members {
  536. clus.lg.Info(
  537. "writing health key",
  538. zap.Int("retries", i),
  539. zap.String("endpoint", m.EtcdClientEndpoint),
  540. )
  541. if err = m.WriteHealthKey(); err != nil {
  542. clus.lg.Warn(
  543. "writing health key failed",
  544. zap.Int("retries", i),
  545. zap.String("endpoint", m.EtcdClientEndpoint),
  546. zap.Error(err),
  547. )
  548. break
  549. }
  550. clus.lg.Info(
  551. "wrote health key",
  552. zap.Int("retries", i),
  553. zap.String("endpoint", m.EtcdClientEndpoint),
  554. )
  555. }
  556. if err == nil {
  557. clus.lg.Info(
  558. "writing health key success on all members",
  559. zap.Int("retries", i),
  560. )
  561. return nil
  562. }
  563. time.Sleep(time.Second)
  564. }
  565. return err
  566. }
  567. // GetLeader returns the index of leader and error if any.
  568. func (clus *Cluster) GetLeader() (int, error) {
  569. for i, m := range clus.Members {
  570. isLeader, err := m.IsLeader()
  571. if isLeader || err != nil {
  572. return i, err
  573. }
  574. }
  575. return 0, fmt.Errorf("no leader found")
  576. }
  577. // maxRev returns the maximum revision found on the cluster.
  578. func (clus *Cluster) maxRev() (rev int64, err error) {
  579. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  580. defer cancel()
  581. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  582. for i := range clus.Members {
  583. go func(m *rpcpb.Member) {
  584. mrev, merr := m.Rev(ctx)
  585. revc <- mrev
  586. errc <- merr
  587. }(clus.Members[i])
  588. }
  589. for i := 0; i < len(clus.Members); i++ {
  590. if merr := <-errc; merr != nil {
  591. err = merr
  592. }
  593. if mrev := <-revc; mrev > rev {
  594. rev = mrev
  595. }
  596. }
  597. return rev, err
  598. }
  599. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  600. revs := make(map[string]int64)
  601. hashes := make(map[string]int64)
  602. for _, m := range clus.Members {
  603. rev, hash, err := m.RevHash()
  604. if err != nil {
  605. return nil, nil, err
  606. }
  607. revs[m.EtcdClientEndpoint] = rev
  608. hashes[m.EtcdClientEndpoint] = hash
  609. }
  610. return revs, hashes, nil
  611. }
  612. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  613. if rev <= 0 {
  614. return nil
  615. }
  616. for i, m := range clus.Members {
  617. clus.lg.Info(
  618. "compacting",
  619. zap.String("endpoint", m.EtcdClientEndpoint),
  620. zap.Int64("compact-revision", rev),
  621. zap.Duration("timeout", timeout),
  622. )
  623. now := time.Now()
  624. cerr := m.Compact(rev, timeout)
  625. succeed := true
  626. if cerr != nil {
  627. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  628. clus.lg.Info(
  629. "compact error is ignored",
  630. zap.String("endpoint", m.EtcdClientEndpoint),
  631. zap.Int64("compact-revision", rev),
  632. zap.Error(cerr),
  633. )
  634. } else {
  635. clus.lg.Warn(
  636. "compact failed",
  637. zap.String("endpoint", m.EtcdClientEndpoint),
  638. zap.Int64("compact-revision", rev),
  639. zap.Error(cerr),
  640. )
  641. err = cerr
  642. succeed = false
  643. }
  644. }
  645. if succeed {
  646. clus.lg.Info(
  647. "compacted",
  648. zap.String("endpoint", m.EtcdClientEndpoint),
  649. zap.Int64("compact-revision", rev),
  650. zap.Duration("timeout", timeout),
  651. zap.Duration("took", time.Since(now)),
  652. )
  653. }
  654. }
  655. return err
  656. }
  657. func (clus *Cluster) checkCompact(rev int64) error {
  658. if rev == 0 {
  659. return nil
  660. }
  661. for _, m := range clus.Members {
  662. if err := m.CheckCompact(rev); err != nil {
  663. return err
  664. }
  665. }
  666. return nil
  667. }
  668. func (clus *Cluster) defrag() error {
  669. clus.lg.Info(
  670. "defragmenting",
  671. zap.Int("round", clus.rd),
  672. zap.Int("case", clus.cs),
  673. )
  674. for _, m := range clus.Members {
  675. if err := m.Defrag(); err != nil {
  676. clus.lg.Warn(
  677. "defrag failed",
  678. zap.Int("round", clus.rd),
  679. zap.Int("case", clus.cs),
  680. zap.Error(err),
  681. )
  682. return err
  683. }
  684. }
  685. clus.lg.Info(
  686. "defragmented",
  687. zap.Int("round", clus.rd),
  688. zap.Int("case", clus.cs),
  689. )
  690. return nil
  691. }
  692. // GetFailureDelayDuration computes failure delay duration.
  693. func (clus *Cluster) GetFailureDelayDuration() time.Duration {
  694. return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
  695. }
  696. // Report reports the number of modified keys.
  697. func (clus *Cluster) Report() int64 {
  698. return clus.stresser.ModifiedKeys()
  699. }