cluster.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net/http"
  22. "net/url"
  23. "path/filepath"
  24. "strings"
  25. "sync"
  26. "time"
  27. "github.com/coreos/etcd/functional/rpcpb"
  28. "github.com/coreos/etcd/pkg/debugutil"
  29. "github.com/coreos/etcd/pkg/fileutil"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "go.uber.org/zap"
  32. "golang.org/x/time/rate"
  33. "google.golang.org/grpc"
  34. )
  35. // Cluster defines tester cluster.
  36. type Cluster struct {
  37. lg *zap.Logger
  38. agentConns []*grpc.ClientConn
  39. agentClients []rpcpb.TransportClient
  40. agentStreams []rpcpb.Transport_TransportClient
  41. agentRequests []*rpcpb.Request
  42. testerHTTPServer *http.Server
  43. Members []*rpcpb.Member `yaml:"agent-configs"`
  44. Tester *rpcpb.Tester `yaml:"tester-config"`
  45. failures []Failure
  46. rateLimiter *rate.Limiter
  47. stresser Stresser
  48. checker Checker
  49. currentRevision int64
  50. rd int
  51. cs int
  52. }
  53. var dialOpts = []grpc.DialOption{
  54. grpc.WithInsecure(),
  55. grpc.WithTimeout(5 * time.Second),
  56. grpc.WithBlock(),
  57. }
  58. // NewCluster creates a client from a tester configuration.
  59. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  60. clus, err := read(lg, fpath)
  61. if err != nil {
  62. return nil, err
  63. }
  64. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  65. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  66. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  67. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  68. clus.failures = make([]Failure, 0)
  69. for i, ap := range clus.Members {
  70. var err error
  71. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  72. if err != nil {
  73. return nil, err
  74. }
  75. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  76. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  77. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  78. if err != nil {
  79. return nil, err
  80. }
  81. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  82. }
  83. mux := http.NewServeMux()
  84. mux.Handle("/metrics", promhttp.Handler())
  85. if clus.Tester.EnablePprof {
  86. for p, h := range debugutil.PProfHandlers() {
  87. mux.Handle(p, h)
  88. }
  89. }
  90. clus.testerHTTPServer = &http.Server{
  91. Addr: clus.Tester.Addr,
  92. Handler: mux,
  93. }
  94. go clus.serveTesterServer()
  95. clus.updateFailures()
  96. clus.rateLimiter = rate.NewLimiter(
  97. rate.Limit(int(clus.Tester.StressQPS)),
  98. int(clus.Tester.StressQPS),
  99. )
  100. clus.updateStresserChecker()
  101. return clus, nil
  102. }
  103. func (clus *Cluster) serveTesterServer() {
  104. clus.lg.Info(
  105. "started tester HTTP server",
  106. zap.String("tester-address", clus.Tester.Addr),
  107. )
  108. err := clus.testerHTTPServer.ListenAndServe()
  109. clus.lg.Info(
  110. "tester HTTP server returned",
  111. zap.String("tester-address", clus.Tester.Addr),
  112. zap.Error(err),
  113. )
  114. if err != nil && err != http.ErrServerClosed {
  115. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  116. }
  117. }
  118. func (clus *Cluster) updateFailures() {
  119. for _, cs := range clus.Tester.FailureCases {
  120. switch cs {
  121. case "KILL_ONE_FOLLOWER":
  122. clus.failures = append(clus.failures, newFailureKillOneFollower())
  123. case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  124. clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
  125. case "KILL_LEADER":
  126. clus.failures = append(clus.failures, newFailureKillLeader())
  127. case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  128. clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
  129. case "KILL_QUORUM":
  130. clus.failures = append(clus.failures, newFailureKillQuorum())
  131. case "KILL_ALL":
  132. clus.failures = append(clus.failures, newFailureKillAll())
  133. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  134. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
  135. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  136. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
  137. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  138. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
  139. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  140. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
  141. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  142. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
  143. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  144. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
  145. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  146. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
  147. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  148. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
  149. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  150. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
  151. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  152. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
  153. case "DELAY_PEER_PORT_TX_RX_LEADER":
  154. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
  155. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  156. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
  157. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  158. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
  159. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  160. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
  161. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  162. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
  163. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  164. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
  165. case "DELAY_PEER_PORT_TX_RX_ALL":
  166. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
  167. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  168. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))
  169. case "NO_FAIL_WITH_STRESS":
  170. clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
  171. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  172. clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
  173. case "EXTERNAL":
  174. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  175. case "FAILPOINTS":
  176. fpFailures, fperr := failpointFailures(clus)
  177. if len(fpFailures) == 0 {
  178. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  179. }
  180. clus.failures = append(clus.failures, fpFailures...)
  181. }
  182. }
  183. }
  184. func (clus *Cluster) failureStrings() (fs []string) {
  185. fs = make([]string, len(clus.failures))
  186. for i := range clus.failures {
  187. fs[i] = clus.failures[i].Desc()
  188. }
  189. return fs
  190. }
  191. // UpdateDelayLatencyMs updates delay latency with random value
  192. // within election timeout.
  193. func (clus *Cluster) UpdateDelayLatencyMs() {
  194. rand.Seed(time.Now().UnixNano())
  195. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  196. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  197. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  198. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  199. }
  200. }
  201. func (clus *Cluster) updateStresserChecker() {
  202. cs := &compositeStresser{}
  203. for _, m := range clus.Members {
  204. cs.stressers = append(cs.stressers, newStresser(clus, m))
  205. }
  206. clus.stresser = cs
  207. if clus.Tester.ConsistencyCheck {
  208. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  209. if schk := cs.Checker(); schk != nil {
  210. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  211. }
  212. } else {
  213. clus.checker = newNoChecker()
  214. }
  215. clus.lg.Info(
  216. "updated stressers",
  217. zap.Int("round", clus.rd),
  218. zap.Int("case", clus.cs),
  219. )
  220. }
  221. func (clus *Cluster) checkConsistency() (err error) {
  222. defer func() {
  223. if err != nil {
  224. return
  225. }
  226. if err = clus.updateRevision(); err != nil {
  227. clus.lg.Warn(
  228. "updateRevision failed",
  229. zap.Error(err),
  230. )
  231. return
  232. }
  233. }()
  234. if err = clus.checker.Check(); err != nil {
  235. clus.lg.Warn(
  236. "consistency check FAIL",
  237. zap.Int("round", clus.rd),
  238. zap.Int("case", clus.cs),
  239. zap.Error(err),
  240. )
  241. return err
  242. }
  243. clus.lg.Info(
  244. "consistency check ALL PASS",
  245. zap.Int("round", clus.rd),
  246. zap.Int("case", clus.cs),
  247. zap.String("desc", clus.failures[clus.cs].Desc()),
  248. )
  249. return err
  250. }
  251. // Bootstrap bootstraps etcd cluster the very first time.
  252. // After this, just continue to call kill/restart.
  253. func (clus *Cluster) Bootstrap() error {
  254. // this is the only time that creates request from scratch
  255. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  256. }
  257. // FailArchive sends "FailArchive" operation.
  258. func (clus *Cluster) FailArchive() error {
  259. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  260. }
  261. // Restart sends "Restart" operation.
  262. func (clus *Cluster) Restart() error {
  263. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  264. }
  265. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  266. var wg sync.WaitGroup
  267. wg.Add(len(clus.agentStreams))
  268. errc := make(chan error, len(clus.agentStreams))
  269. for i := range clus.agentStreams {
  270. go func(idx int, o rpcpb.Operation) {
  271. defer wg.Done()
  272. errc <- clus.sendOperation(idx, o)
  273. }(i, op)
  274. }
  275. wg.Wait()
  276. close(errc)
  277. errs := []string{}
  278. for err := range errc {
  279. if err == nil {
  280. continue
  281. }
  282. if err != nil &&
  283. op == rpcpb.Operation_DestroyEtcdAgent &&
  284. strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
  285. // agent server has already closed;
  286. // so this error is expected
  287. clus.lg.Info("successfully destroyed all")
  288. continue
  289. }
  290. errs = append(errs, err.Error())
  291. }
  292. if len(errs) == 0 {
  293. return nil
  294. }
  295. return errors.New(strings.Join(errs, ", "))
  296. }
  297. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  298. if op == rpcpb.Operation_InitialStartEtcd {
  299. clus.agentRequests[idx] = &rpcpb.Request{
  300. Operation: op,
  301. Member: clus.Members[idx],
  302. Tester: clus.Tester,
  303. }
  304. } else {
  305. clus.agentRequests[idx].Operation = op
  306. }
  307. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  308. clus.lg.Info(
  309. "sent request",
  310. zap.String("operation", op.String()),
  311. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  312. zap.Error(err),
  313. )
  314. if err != nil {
  315. return err
  316. }
  317. resp, err := clus.agentStreams[idx].Recv()
  318. if resp != nil {
  319. clus.lg.Info(
  320. "received response",
  321. zap.String("operation", op.String()),
  322. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  323. zap.Bool("success", resp.Success),
  324. zap.String("status", resp.Status),
  325. zap.Error(err),
  326. )
  327. } else {
  328. clus.lg.Info(
  329. "received empty response",
  330. zap.String("operation", op.String()),
  331. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  332. zap.Error(err),
  333. )
  334. }
  335. if err != nil {
  336. return err
  337. }
  338. if !resp.Success {
  339. return errors.New(resp.Status)
  340. }
  341. m, secure := clus.Members[idx], false
  342. for _, cu := range m.Etcd.AdvertiseClientURLs {
  343. u, err := url.Parse(cu)
  344. if err != nil {
  345. return err
  346. }
  347. if u.Scheme == "https" { // TODO: handle unix
  348. secure = true
  349. }
  350. }
  351. // store TLS assets from agents/servers onto disk
  352. if secure && (op == rpcpb.Operation_InitialStartEtcd || op == rpcpb.Operation_RestartEtcd) {
  353. dirClient := filepath.Join(
  354. clus.Tester.DataDir,
  355. clus.Members[idx].Etcd.Name,
  356. "fixtures",
  357. "client",
  358. )
  359. if err = fileutil.TouchDirAll(dirClient); err != nil {
  360. return err
  361. }
  362. clientCertData := []byte(resp.Member.ClientCertData)
  363. if len(clientCertData) == 0 {
  364. return fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
  365. }
  366. clientCertPath := filepath.Join(dirClient, "cert.pem")
  367. if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
  368. return err
  369. }
  370. resp.Member.ClientCertPath = clientCertPath
  371. clus.lg.Info(
  372. "saved client cert file",
  373. zap.String("path", clientCertPath),
  374. )
  375. clientKeyData := []byte(resp.Member.ClientKeyData)
  376. if len(clientKeyData) == 0 {
  377. return fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
  378. }
  379. clientKeyPath := filepath.Join(dirClient, "key.pem")
  380. if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
  381. return err
  382. }
  383. resp.Member.ClientKeyPath = clientKeyPath
  384. clus.lg.Info(
  385. "saved client key file",
  386. zap.String("path", clientKeyPath),
  387. )
  388. clientTrustedCAData := []byte(resp.Member.ClientTrustedCAData)
  389. if len(clientTrustedCAData) != 0 {
  390. // TODO: disable this when auto TLS is deprecated
  391. clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
  392. if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
  393. return err
  394. }
  395. resp.Member.ClientTrustedCAPath = clientTrustedCAPath
  396. clus.lg.Info(
  397. "saved client trusted CA file",
  398. zap.String("path", clientTrustedCAPath),
  399. )
  400. }
  401. // no need to store peer certs for tester clients
  402. clus.Members[idx] = resp.Member
  403. }
  404. return nil
  405. }
  406. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  407. func (clus *Cluster) DestroyEtcdAgents() {
  408. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  409. if err != nil {
  410. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  411. } else {
  412. clus.lg.Info("destroying etcd/agents PASS")
  413. }
  414. for i, conn := range clus.agentConns {
  415. err := conn.Close()
  416. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  417. }
  418. if clus.testerHTTPServer != nil {
  419. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  420. err := clus.testerHTTPServer.Shutdown(ctx)
  421. cancel()
  422. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
  423. }
  424. }
  425. // WaitHealth ensures all members are healthy
  426. // by writing a test key to etcd cluster.
  427. func (clus *Cluster) WaitHealth() error {
  428. var err error
  429. // wait 60s to check cluster health.
  430. // TODO: set it to a reasonable value. It is set that high because
  431. // follower may use long time to catch up the leader when reboot under
  432. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  433. for i := 0; i < 60; i++ {
  434. for _, m := range clus.Members {
  435. if err = m.WriteHealthKey(); err != nil {
  436. clus.lg.Warn(
  437. "health check FAIL",
  438. zap.Int("retries", i),
  439. zap.String("endpoint", m.EtcdClientEndpoint),
  440. zap.Error(err),
  441. )
  442. break
  443. }
  444. clus.lg.Info(
  445. "health check PASS",
  446. zap.Int("retries", i),
  447. zap.String("endpoint", m.EtcdClientEndpoint),
  448. )
  449. }
  450. if err == nil {
  451. clus.lg.Info(
  452. "health check ALL PASS",
  453. zap.Int("round", clus.rd),
  454. zap.Int("case", clus.cs),
  455. )
  456. return nil
  457. }
  458. time.Sleep(time.Second)
  459. }
  460. return err
  461. }
  462. // GetLeader returns the index of leader and error if any.
  463. func (clus *Cluster) GetLeader() (int, error) {
  464. for i, m := range clus.Members {
  465. isLeader, err := m.IsLeader()
  466. if isLeader || err != nil {
  467. return i, err
  468. }
  469. }
  470. return 0, fmt.Errorf("no leader found")
  471. }
  472. // maxRev returns the maximum revision found on the cluster.
  473. func (clus *Cluster) maxRev() (rev int64, err error) {
  474. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  475. defer cancel()
  476. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  477. for i := range clus.Members {
  478. go func(m *rpcpb.Member) {
  479. mrev, merr := m.Rev(ctx)
  480. revc <- mrev
  481. errc <- merr
  482. }(clus.Members[i])
  483. }
  484. for i := 0; i < len(clus.Members); i++ {
  485. if merr := <-errc; merr != nil {
  486. err = merr
  487. }
  488. if mrev := <-revc; mrev > rev {
  489. rev = mrev
  490. }
  491. }
  492. return rev, err
  493. }
  494. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  495. revs := make(map[string]int64)
  496. hashes := make(map[string]int64)
  497. for _, m := range clus.Members {
  498. rev, hash, err := m.RevHash()
  499. if err != nil {
  500. return nil, nil, err
  501. }
  502. revs[m.EtcdClientEndpoint] = rev
  503. hashes[m.EtcdClientEndpoint] = hash
  504. }
  505. return revs, hashes, nil
  506. }
  507. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  508. if rev <= 0 {
  509. return nil
  510. }
  511. for i, m := range clus.Members {
  512. clus.lg.Info(
  513. "compact START",
  514. zap.String("endpoint", m.EtcdClientEndpoint),
  515. zap.Int64("compact-revision", rev),
  516. zap.Duration("timeout", timeout),
  517. )
  518. now := time.Now()
  519. cerr := m.Compact(rev, timeout)
  520. succeed := true
  521. if cerr != nil {
  522. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  523. clus.lg.Info(
  524. "compact error is ignored",
  525. zap.String("endpoint", m.EtcdClientEndpoint),
  526. zap.Int64("compact-revision", rev),
  527. zap.Error(cerr),
  528. )
  529. } else {
  530. clus.lg.Warn(
  531. "compact FAIL",
  532. zap.String("endpoint", m.EtcdClientEndpoint),
  533. zap.Int64("compact-revision", rev),
  534. zap.Error(cerr),
  535. )
  536. err = cerr
  537. succeed = false
  538. }
  539. }
  540. if succeed {
  541. clus.lg.Info(
  542. "compact PASS",
  543. zap.String("endpoint", m.EtcdClientEndpoint),
  544. zap.Int64("compact-revision", rev),
  545. zap.Duration("timeout", timeout),
  546. zap.Duration("took", time.Since(now)),
  547. )
  548. }
  549. }
  550. return err
  551. }
  552. func (clus *Cluster) checkCompact(rev int64) error {
  553. if rev == 0 {
  554. return nil
  555. }
  556. for _, m := range clus.Members {
  557. if err := m.CheckCompact(rev); err != nil {
  558. return err
  559. }
  560. }
  561. return nil
  562. }
  563. func (clus *Cluster) defrag() error {
  564. for _, m := range clus.Members {
  565. if err := m.Defrag(); err != nil {
  566. clus.lg.Warn(
  567. "defrag FAIL",
  568. zap.String("endpoint", m.EtcdClientEndpoint),
  569. zap.Error(err),
  570. )
  571. return err
  572. }
  573. clus.lg.Info(
  574. "defrag PASS",
  575. zap.String("endpoint", m.EtcdClientEndpoint),
  576. )
  577. }
  578. clus.lg.Info(
  579. "defrag ALL PASS",
  580. zap.Int("round", clus.rd),
  581. zap.Int("case", clus.cs),
  582. )
  583. return nil
  584. }
  585. // GetFailureDelayDuration computes failure delay duration.
  586. func (clus *Cluster) GetFailureDelayDuration() time.Duration {
  587. return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
  588. }
  589. // Report reports the number of modified keys.
  590. func (clus *Cluster) Report() int64 {
  591. return clus.stresser.ModifiedKeys()
  592. }