cluster.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "math/rand"
  22. "net/http"
  23. "net/url"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "time"
  28. "github.com/coreos/etcd/functional/rpcpb"
  29. "github.com/coreos/etcd/pkg/debugutil"
  30. "github.com/coreos/etcd/pkg/fileutil"
  31. "github.com/prometheus/client_golang/prometheus/promhttp"
  32. "go.uber.org/zap"
  33. "golang.org/x/time/rate"
  34. "google.golang.org/grpc"
  35. )
  36. // Cluster defines tester cluster.
  37. type Cluster struct {
  38. lg *zap.Logger
  39. agentConns []*grpc.ClientConn
  40. agentClients []rpcpb.TransportClient
  41. agentStreams []rpcpb.Transport_TransportClient
  42. agentRequests []*rpcpb.Request
  43. testerHTTPServer *http.Server
  44. Members []*rpcpb.Member `yaml:"agent-configs"`
  45. Tester *rpcpb.Tester `yaml:"tester-config"`
  46. failures []Failure
  47. rateLimiter *rate.Limiter
  48. stresser Stresser
  49. checker Checker
  50. currentRevision int64
  51. rd int
  52. cs int
  53. }
  54. var dialOpts = []grpc.DialOption{
  55. grpc.WithInsecure(),
  56. grpc.WithTimeout(5 * time.Second),
  57. grpc.WithBlock(),
  58. }
  59. // NewCluster creates a client from a tester configuration.
  60. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  61. clus, err := read(lg, fpath)
  62. if err != nil {
  63. return nil, err
  64. }
  65. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  66. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  67. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  68. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  69. clus.failures = make([]Failure, 0)
  70. for i, ap := range clus.Members {
  71. var err error
  72. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  73. if err != nil {
  74. return nil, err
  75. }
  76. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  77. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  78. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  79. if err != nil {
  80. return nil, err
  81. }
  82. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  83. }
  84. mux := http.NewServeMux()
  85. mux.Handle("/metrics", promhttp.Handler())
  86. if clus.Tester.EnablePprof {
  87. for p, h := range debugutil.PProfHandlers() {
  88. mux.Handle(p, h)
  89. }
  90. }
  91. clus.testerHTTPServer = &http.Server{
  92. Addr: clus.Tester.Addr,
  93. Handler: mux,
  94. }
  95. go clus.serveTesterServer()
  96. clus.updateFailures()
  97. clus.rateLimiter = rate.NewLimiter(
  98. rate.Limit(int(clus.Tester.StressQPS)),
  99. int(clus.Tester.StressQPS),
  100. )
  101. clus.updateStresserChecker()
  102. return clus, nil
  103. }
  104. func (clus *Cluster) serveTesterServer() {
  105. clus.lg.Info(
  106. "started tester HTTP server",
  107. zap.String("tester-address", clus.Tester.Addr),
  108. )
  109. err := clus.testerHTTPServer.ListenAndServe()
  110. clus.lg.Info(
  111. "tester HTTP server returned",
  112. zap.String("tester-address", clus.Tester.Addr),
  113. zap.Error(err),
  114. )
  115. if err != nil && err != http.ErrServerClosed {
  116. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  117. }
  118. }
  119. func (clus *Cluster) updateFailures() {
  120. for _, cs := range clus.Tester.FailureCases {
  121. switch cs {
  122. case "KILL_ONE_FOLLOWER":
  123. clus.failures = append(clus.failures, newFailureKillOneFollower())
  124. case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  125. clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
  126. case "KILL_LEADER":
  127. clus.failures = append(clus.failures, newFailureKillLeader())
  128. case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  129. clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
  130. case "KILL_QUORUM":
  131. clus.failures = append(clus.failures, newFailureKillQuorum())
  132. case "KILL_ALL":
  133. clus.failures = append(clus.failures, newFailureKillAll())
  134. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  135. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
  136. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  137. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
  138. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  139. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
  140. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  141. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
  142. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  143. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
  144. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  145. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
  146. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  147. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
  148. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  149. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
  150. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  151. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
  152. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  153. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
  154. case "DELAY_PEER_PORT_TX_RX_LEADER":
  155. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
  156. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  157. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
  158. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  159. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
  160. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  161. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
  162. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  163. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
  164. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  165. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
  166. case "DELAY_PEER_PORT_TX_RX_ALL":
  167. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
  168. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  169. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))
  170. case "NO_FAIL_WITH_STRESS":
  171. clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
  172. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  173. clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
  174. case "EXTERNAL":
  175. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  176. case "FAILPOINTS":
  177. fpFailures, fperr := failpointFailures(clus)
  178. if len(fpFailures) == 0 {
  179. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  180. }
  181. clus.failures = append(clus.failures, fpFailures...)
  182. }
  183. }
  184. }
  185. func (clus *Cluster) failureStrings() (fs []string) {
  186. fs = make([]string, len(clus.failures))
  187. for i := range clus.failures {
  188. fs[i] = clus.failures[i].Desc()
  189. }
  190. return fs
  191. }
  192. // UpdateDelayLatencyMs updates delay latency with random value
  193. // within election timeout.
  194. func (clus *Cluster) UpdateDelayLatencyMs() {
  195. rand.Seed(time.Now().UnixNano())
  196. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  197. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  198. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  199. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  200. }
  201. }
  202. func (clus *Cluster) updateStresserChecker() {
  203. cs := &compositeStresser{}
  204. for _, m := range clus.Members {
  205. cs.stressers = append(cs.stressers, newStresser(clus, m))
  206. }
  207. clus.stresser = cs
  208. if clus.Tester.ConsistencyCheck {
  209. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  210. if schk := cs.Checker(); schk != nil {
  211. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  212. }
  213. } else {
  214. clus.checker = newNoChecker()
  215. }
  216. clus.lg.Info("updated stressers")
  217. }
  218. func (clus *Cluster) checkConsistency() (err error) {
  219. defer func() {
  220. if err != nil {
  221. return
  222. }
  223. if err = clus.updateRevision(); err != nil {
  224. clus.lg.Warn(
  225. "updateRevision failed",
  226. zap.Error(err),
  227. )
  228. return
  229. }
  230. }()
  231. if err = clus.checker.Check(); err != nil {
  232. clus.lg.Warn(
  233. "consistency check FAIL",
  234. zap.Int("round", clus.rd),
  235. zap.Int("case", clus.cs),
  236. zap.Error(err),
  237. )
  238. return err
  239. }
  240. clus.lg.Info(
  241. "consistency check ALL PASS",
  242. zap.Int("round", clus.rd),
  243. zap.Int("case", clus.cs),
  244. zap.String("desc", clus.failures[clus.cs].Desc()),
  245. )
  246. return err
  247. }
  248. // Bootstrap bootstraps etcd cluster the very first time.
  249. // After this, just continue to call kill/restart.
  250. func (clus *Cluster) Bootstrap() error {
  251. // this is the only time that creates request from scratch
  252. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  253. }
  254. // FailArchive sends "FailArchive" operation.
  255. func (clus *Cluster) FailArchive() error {
  256. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  257. }
  258. // Restart sends "Restart" operation.
  259. func (clus *Cluster) Restart() error {
  260. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  261. }
  262. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  263. var wg sync.WaitGroup
  264. wg.Add(len(clus.agentStreams))
  265. errc := make(chan error, len(clus.agentStreams))
  266. for i := range clus.agentStreams {
  267. go func(idx int, o rpcpb.Operation) {
  268. defer wg.Done()
  269. errc <- clus.sendOperation(idx, o)
  270. }(i, op)
  271. }
  272. wg.Wait()
  273. close(errc)
  274. errs := []string{}
  275. for err := range errc {
  276. if err == nil {
  277. continue
  278. }
  279. if err != nil {
  280. destroyed := false
  281. if op == rpcpb.Operation_DestroyEtcdAgent {
  282. if err == io.EOF {
  283. destroyed = true
  284. }
  285. if strings.Contains(err.Error(),
  286. "rpc error: code = Unavailable desc = transport is closing") {
  287. // agent server has already closed;
  288. // so this error is expected
  289. destroyed = true
  290. }
  291. if strings.Contains(err.Error(),
  292. "desc = os: process already finished") {
  293. destroyed = true
  294. }
  295. }
  296. if !destroyed {
  297. errs = append(errs, err.Error())
  298. }
  299. }
  300. }
  301. if len(errs) == 0 {
  302. return nil
  303. }
  304. return errors.New(strings.Join(errs, ", "))
  305. }
  306. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  307. if op == rpcpb.Operation_InitialStartEtcd {
  308. clus.agentRequests[idx] = &rpcpb.Request{
  309. Operation: op,
  310. Member: clus.Members[idx],
  311. Tester: clus.Tester,
  312. }
  313. } else {
  314. clus.agentRequests[idx].Operation = op
  315. }
  316. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  317. clus.lg.Info(
  318. "sent request",
  319. zap.String("operation", op.String()),
  320. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  321. zap.Error(err),
  322. )
  323. if err != nil {
  324. return err
  325. }
  326. resp, err := clus.agentStreams[idx].Recv()
  327. if resp != nil {
  328. clus.lg.Info(
  329. "received response",
  330. zap.String("operation", op.String()),
  331. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  332. zap.Bool("success", resp.Success),
  333. zap.String("status", resp.Status),
  334. zap.Error(err),
  335. )
  336. } else {
  337. clus.lg.Info(
  338. "received empty response",
  339. zap.String("operation", op.String()),
  340. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  341. zap.Error(err),
  342. )
  343. }
  344. if err != nil {
  345. return err
  346. }
  347. if !resp.Success {
  348. return errors.New(resp.Status)
  349. }
  350. m, secure := clus.Members[idx], false
  351. for _, cu := range m.Etcd.AdvertiseClientURLs {
  352. u, err := url.Parse(cu)
  353. if err != nil {
  354. return err
  355. }
  356. if u.Scheme == "https" { // TODO: handle unix
  357. secure = true
  358. }
  359. }
  360. // store TLS assets from agents/servers onto disk
  361. if secure && (op == rpcpb.Operation_InitialStartEtcd || op == rpcpb.Operation_RestartEtcd) {
  362. dirClient := filepath.Join(
  363. clus.Tester.DataDir,
  364. clus.Members[idx].Etcd.Name,
  365. "fixtures",
  366. "client",
  367. )
  368. if err = fileutil.TouchDirAll(dirClient); err != nil {
  369. return err
  370. }
  371. clientCertData := []byte(resp.Member.ClientCertData)
  372. if len(clientCertData) == 0 {
  373. return fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
  374. }
  375. clientCertPath := filepath.Join(dirClient, "cert.pem")
  376. if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
  377. return err
  378. }
  379. resp.Member.ClientCertPath = clientCertPath
  380. clus.lg.Info(
  381. "saved client cert file",
  382. zap.String("path", clientCertPath),
  383. )
  384. clientKeyData := []byte(resp.Member.ClientKeyData)
  385. if len(clientKeyData) == 0 {
  386. return fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
  387. }
  388. clientKeyPath := filepath.Join(dirClient, "key.pem")
  389. if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
  390. return err
  391. }
  392. resp.Member.ClientKeyPath = clientKeyPath
  393. clus.lg.Info(
  394. "saved client key file",
  395. zap.String("path", clientKeyPath),
  396. )
  397. clientTrustedCAData := []byte(resp.Member.ClientTrustedCAData)
  398. if len(clientTrustedCAData) != 0 {
  399. // TODO: disable this when auto TLS is deprecated
  400. clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
  401. if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
  402. return err
  403. }
  404. resp.Member.ClientTrustedCAPath = clientTrustedCAPath
  405. clus.lg.Info(
  406. "saved client trusted CA file",
  407. zap.String("path", clientTrustedCAPath),
  408. )
  409. }
  410. // no need to store peer certs for tester clients
  411. clus.Members[idx] = resp.Member
  412. }
  413. return nil
  414. }
  415. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  416. func (clus *Cluster) DestroyEtcdAgents() {
  417. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  418. if err != nil {
  419. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  420. } else {
  421. clus.lg.Info("destroying etcd/agents PASS")
  422. }
  423. for i, conn := range clus.agentConns {
  424. err := conn.Close()
  425. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  426. }
  427. if clus.testerHTTPServer != nil {
  428. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  429. err := clus.testerHTTPServer.Shutdown(ctx)
  430. cancel()
  431. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
  432. }
  433. }
  434. // WaitHealth ensures all members are healthy
  435. // by writing a test key to etcd cluster.
  436. func (clus *Cluster) WaitHealth() error {
  437. var err error
  438. // wait 60s to check cluster health.
  439. // TODO: set it to a reasonable value. It is set that high because
  440. // follower may use long time to catch up the leader when reboot under
  441. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  442. for i := 0; i < 60; i++ {
  443. for _, m := range clus.Members {
  444. if err = m.WriteHealthKey(); err != nil {
  445. clus.lg.Warn(
  446. "health check FAIL",
  447. zap.Int("retries", i),
  448. zap.String("endpoint", m.EtcdClientEndpoint),
  449. zap.Error(err),
  450. )
  451. break
  452. }
  453. clus.lg.Info(
  454. "health check PASS",
  455. zap.Int("retries", i),
  456. zap.String("endpoint", m.EtcdClientEndpoint),
  457. )
  458. }
  459. if err == nil {
  460. clus.lg.Info(
  461. "health check ALL PASS",
  462. zap.Int("round", clus.rd),
  463. zap.Int("case", clus.cs),
  464. )
  465. return nil
  466. }
  467. time.Sleep(time.Second)
  468. }
  469. return err
  470. }
  471. // GetLeader returns the index of leader and error if any.
  472. func (clus *Cluster) GetLeader() (int, error) {
  473. for i, m := range clus.Members {
  474. isLeader, err := m.IsLeader()
  475. if isLeader || err != nil {
  476. return i, err
  477. }
  478. }
  479. return 0, fmt.Errorf("no leader found")
  480. }
  481. // maxRev returns the maximum revision found on the cluster.
  482. func (clus *Cluster) maxRev() (rev int64, err error) {
  483. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  484. defer cancel()
  485. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  486. for i := range clus.Members {
  487. go func(m *rpcpb.Member) {
  488. mrev, merr := m.Rev(ctx)
  489. revc <- mrev
  490. errc <- merr
  491. }(clus.Members[i])
  492. }
  493. for i := 0; i < len(clus.Members); i++ {
  494. if merr := <-errc; merr != nil {
  495. err = merr
  496. }
  497. if mrev := <-revc; mrev > rev {
  498. rev = mrev
  499. }
  500. }
  501. return rev, err
  502. }
  503. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  504. revs := make(map[string]int64)
  505. hashes := make(map[string]int64)
  506. for _, m := range clus.Members {
  507. rev, hash, err := m.RevHash()
  508. if err != nil {
  509. return nil, nil, err
  510. }
  511. revs[m.EtcdClientEndpoint] = rev
  512. hashes[m.EtcdClientEndpoint] = hash
  513. }
  514. return revs, hashes, nil
  515. }
  516. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  517. if rev <= 0 {
  518. return nil
  519. }
  520. for i, m := range clus.Members {
  521. clus.lg.Info(
  522. "compact START",
  523. zap.String("endpoint", m.EtcdClientEndpoint),
  524. zap.Int64("compact-revision", rev),
  525. zap.Duration("timeout", timeout),
  526. )
  527. now := time.Now()
  528. cerr := m.Compact(rev, timeout)
  529. succeed := true
  530. if cerr != nil {
  531. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  532. clus.lg.Info(
  533. "compact error is ignored",
  534. zap.String("endpoint", m.EtcdClientEndpoint),
  535. zap.Int64("compact-revision", rev),
  536. zap.Error(cerr),
  537. )
  538. } else {
  539. clus.lg.Warn(
  540. "compact FAIL",
  541. zap.String("endpoint", m.EtcdClientEndpoint),
  542. zap.Int64("compact-revision", rev),
  543. zap.Error(cerr),
  544. )
  545. err = cerr
  546. succeed = false
  547. }
  548. }
  549. if succeed {
  550. clus.lg.Info(
  551. "compact PASS",
  552. zap.String("endpoint", m.EtcdClientEndpoint),
  553. zap.Int64("compact-revision", rev),
  554. zap.Duration("timeout", timeout),
  555. zap.Duration("took", time.Since(now)),
  556. )
  557. }
  558. }
  559. return err
  560. }
  561. func (clus *Cluster) checkCompact(rev int64) error {
  562. if rev == 0 {
  563. return nil
  564. }
  565. for _, m := range clus.Members {
  566. if err := m.CheckCompact(rev); err != nil {
  567. return err
  568. }
  569. }
  570. return nil
  571. }
  572. func (clus *Cluster) defrag() error {
  573. for _, m := range clus.Members {
  574. if err := m.Defrag(); err != nil {
  575. clus.lg.Warn(
  576. "defrag FAIL",
  577. zap.String("endpoint", m.EtcdClientEndpoint),
  578. zap.Error(err),
  579. )
  580. return err
  581. }
  582. clus.lg.Info(
  583. "defrag PASS",
  584. zap.String("endpoint", m.EtcdClientEndpoint),
  585. )
  586. }
  587. clus.lg.Info(
  588. "defrag ALL PASS",
  589. zap.Int("round", clus.rd),
  590. zap.Int("case", clus.cs),
  591. zap.Int("case-total", len(clus.failures)),
  592. )
  593. return nil
  594. }
  595. // GetFailureDelayDuration computes failure delay duration.
  596. func (clus *Cluster) GetFailureDelayDuration() time.Duration {
  597. return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
  598. }
  599. // Report reports the number of modified keys.
  600. func (clus *Cluster) Report() int64 {
  601. return clus.stresser.ModifiedKeys()
  602. }