cluster.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net/http"
  22. "path/filepath"
  23. "strings"
  24. "time"
  25. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  26. "github.com/coreos/etcd/pkg/debugutil"
  27. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  28. "github.com/prometheus/client_golang/prometheus/promhttp"
  29. "go.uber.org/zap"
  30. "golang.org/x/time/rate"
  31. "google.golang.org/grpc"
  32. yaml "gopkg.in/yaml.v2"
  33. )
  34. // Cluster defines tester cluster.
  35. type Cluster struct {
  36. lg *zap.Logger
  37. agentConns []*grpc.ClientConn
  38. agentClients []rpcpb.TransportClient
  39. agentStreams []rpcpb.Transport_TransportClient
  40. agentRequests []*rpcpb.Request
  41. testerHTTPServer *http.Server
  42. Members []*rpcpb.Member `yaml:"agent-configs"`
  43. Tester *rpcpb.Tester `yaml:"tester-config"`
  44. failures []Failure
  45. rateLimiter *rate.Limiter
  46. stresser Stresser
  47. checker Checker
  48. currentRevision int64
  49. rd int
  50. cs int
  51. }
  52. func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  53. lg.Info("reading configuration file", zap.String("path", fpath))
  54. bts, err := ioutil.ReadFile(fpath)
  55. if err != nil {
  56. return nil, err
  57. }
  58. lg.Info("opened configuration file", zap.String("path", fpath))
  59. clus := &Cluster{lg: lg}
  60. if err = yaml.Unmarshal(bts, clus); err != nil {
  61. return nil, err
  62. }
  63. for i := range clus.Members {
  64. if clus.Members[i].BaseDir == "" {
  65. return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", clus.Members[i].BaseDir)
  66. }
  67. if clus.Members[i].EtcdLogPath == "" {
  68. return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", clus.Members[i].EtcdLogPath)
  69. }
  70. if clus.Members[i].Etcd.Name == "" {
  71. return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", clus.Members[i])
  72. }
  73. if clus.Members[i].Etcd.DataDir == "" {
  74. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", clus.Members[i])
  75. }
  76. if clus.Members[i].Etcd.SnapshotCount == 0 {
  77. return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", clus.Members[i].Etcd.SnapshotCount)
  78. }
  79. if clus.Members[i].Etcd.DataDir == "" {
  80. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", clus.Members[i].Etcd.DataDir)
  81. }
  82. if clus.Members[i].Etcd.WALDir == "" {
  83. clus.Members[i].Etcd.WALDir = filepath.Join(clus.Members[i].Etcd.DataDir, "member", "wal")
  84. }
  85. port := ""
  86. listenClientPorts := make([]string, len(clus.Members))
  87. for i, u := range clus.Members[i].Etcd.ListenClientURLs {
  88. if !isValidURL(u) {
  89. return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
  90. }
  91. listenClientPorts[i], err = getPort(u)
  92. if err != nil {
  93. return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
  94. }
  95. }
  96. for i, u := range clus.Members[i].Etcd.AdvertiseClientURLs {
  97. if !isValidURL(u) {
  98. return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
  99. }
  100. port, err = getPort(u)
  101. if err != nil {
  102. return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
  103. }
  104. if clus.Members[i].EtcdClientProxy && listenClientPorts[i] == port {
  105. return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
  106. }
  107. }
  108. listenPeerPorts := make([]string, len(clus.Members))
  109. for i, u := range clus.Members[i].Etcd.ListenPeerURLs {
  110. if !isValidURL(u) {
  111. return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
  112. }
  113. listenPeerPorts[i], err = getPort(u)
  114. if err != nil {
  115. return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
  116. }
  117. }
  118. for i, u := range clus.Members[i].Etcd.InitialAdvertisePeerURLs {
  119. if !isValidURL(u) {
  120. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
  121. }
  122. port, err = getPort(u)
  123. if err != nil {
  124. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
  125. }
  126. if clus.Members[i].EtcdPeerProxy && listenPeerPorts[i] == port {
  127. return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[i])
  128. }
  129. }
  130. if !strings.HasPrefix(clus.Members[i].EtcdLogPath, clus.Members[i].BaseDir) {
  131. return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", clus.Members[i].EtcdLogPath)
  132. }
  133. if !strings.HasPrefix(clus.Members[i].Etcd.DataDir, clus.Members[i].BaseDir) {
  134. return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.DataDir)
  135. }
  136. // TODO: support separate WALDir that can be handled via failure-archive
  137. if !strings.HasPrefix(clus.Members[i].Etcd.WALDir, clus.Members[i].BaseDir) {
  138. return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.WALDir)
  139. }
  140. if len(clus.Tester.FailureCases) == 0 {
  141. return nil, errors.New("FailureCases not found")
  142. }
  143. }
  144. for _, v := range clus.Tester.FailureCases {
  145. if _, ok := rpcpb.FailureCase_value[v]; !ok {
  146. return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
  147. }
  148. }
  149. for _, v := range clus.Tester.StressTypes {
  150. if _, ok := rpcpb.StressType_value[v]; !ok {
  151. return nil, fmt.Errorf("StressType is unknown; got %q", v)
  152. }
  153. }
  154. if clus.Tester.StressKeySuffixRangeTxn > 100 {
  155. return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
  156. }
  157. if clus.Tester.StressKeyTxnOps > 64 {
  158. return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
  159. }
  160. return clus, err
  161. }
  162. // TODO: status handler
  163. var dialOpts = []grpc.DialOption{
  164. grpc.WithInsecure(),
  165. grpc.WithTimeout(5 * time.Second),
  166. grpc.WithBlock(),
  167. }
  168. // NewCluster creates a client from a tester configuration.
  169. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  170. clus, err := newCluster(lg, fpath)
  171. if err != nil {
  172. return nil, err
  173. }
  174. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  175. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  176. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  177. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  178. clus.failures = make([]Failure, 0)
  179. for i, ap := range clus.Members {
  180. clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr))
  181. var err error
  182. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  183. if err != nil {
  184. return nil, err
  185. }
  186. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  187. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  188. clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
  189. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  190. if err != nil {
  191. return nil, err
  192. }
  193. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  194. }
  195. mux := http.NewServeMux()
  196. mux.Handle("/metrics", promhttp.Handler())
  197. if clus.Tester.EnablePprof {
  198. for p, h := range debugutil.PProfHandlers() {
  199. mux.Handle(p, h)
  200. }
  201. }
  202. clus.testerHTTPServer = &http.Server{
  203. Addr: clus.Tester.TesterAddr,
  204. Handler: mux,
  205. }
  206. go clus.serveTesterServer()
  207. clus.updateFailures()
  208. clus.rateLimiter = rate.NewLimiter(
  209. rate.Limit(int(clus.Tester.StressQPS)),
  210. int(clus.Tester.StressQPS),
  211. )
  212. clus.updateStresserChecker()
  213. return clus, nil
  214. }
  215. func (clus *Cluster) serveTesterServer() {
  216. clus.lg.Info(
  217. "started tester HTTP server",
  218. zap.String("tester-address", clus.Tester.TesterAddr),
  219. )
  220. err := clus.testerHTTPServer.ListenAndServe()
  221. clus.lg.Info(
  222. "tester HTTP server returned",
  223. zap.String("tester-address", clus.Tester.TesterAddr),
  224. zap.Error(err),
  225. )
  226. if err != nil && err != http.ErrServerClosed {
  227. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  228. }
  229. }
  230. func (clus *Cluster) updateFailures() {
  231. for _, cs := range clus.Tester.FailureCases {
  232. switch cs {
  233. case "KILL_ONE_FOLLOWER":
  234. clus.failures = append(clus.failures, newFailureKillOneFollower())
  235. case "KILL_LEADER":
  236. clus.failures = append(clus.failures, newFailureKillLeader())
  237. case "KILL_ONE_FOLLOWER_FOR_LONG":
  238. clus.failures = append(clus.failures, newFailureKillOneFollowerForLongTime())
  239. case "KILL_LEADER_FOR_LONG":
  240. clus.failures = append(clus.failures, newFailureKillLeaderForLongTime())
  241. case "KILL_QUORUM":
  242. clus.failures = append(clus.failures, newFailureKillQuorum())
  243. case "KILL_ALL":
  244. clus.failures = append(clus.failures, newFailureKillAll())
  245. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  246. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower())
  247. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  248. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader())
  249. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  250. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll())
  251. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  252. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower())
  253. case "DELAY_PEER_PORT_TX_RX_LEADER":
  254. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader())
  255. case "DELAY_PEER_PORT_TX_RX_ALL":
  256. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll())
  257. case "FAILPOINTS":
  258. fpFailures, fperr := failpointFailures(clus)
  259. if len(fpFailures) == 0 {
  260. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  261. }
  262. clus.failures = append(clus.failures, fpFailures...)
  263. case "NO_FAIL":
  264. clus.failures = append(clus.failures, newFailureNoOp())
  265. case "EXTERNAL":
  266. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  267. }
  268. }
  269. }
  270. func (clus *Cluster) failureStrings() (fs []string) {
  271. fs = make([]string, len(clus.failures))
  272. for i := range clus.failures {
  273. fs[i] = clus.failures[i].Desc()
  274. }
  275. return fs
  276. }
  277. func (clus *Cluster) shuffleFailures() {
  278. rand.Seed(time.Now().UnixNano())
  279. offset := rand.Intn(1000)
  280. n := len(clus.failures)
  281. cp := coprime(n)
  282. clus.lg.Info("shuffling test failure cases", zap.Int("total", n))
  283. fs := make([]Failure, n)
  284. for i := 0; i < n; i++ {
  285. fs[i] = clus.failures[(cp*i+offset)%n]
  286. }
  287. clus.failures = fs
  288. clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
  289. }
  290. /*
  291. x and y of GCD 1 are coprime to each other
  292. x1 = ( coprime of n * idx1 + offset ) % n
  293. x2 = ( coprime of n * idx2 + offset ) % n
  294. (x2 - x1) = coprime of n * (idx2 - idx1) % n
  295. = (idx2 - idx1) = 1
  296. Consecutive x's are guaranteed to be distinct
  297. */
  298. func coprime(n int) int {
  299. coprime := 1
  300. for i := n / 2; i < n; i++ {
  301. if gcd(i, n) == 1 {
  302. coprime = i
  303. break
  304. }
  305. }
  306. return coprime
  307. }
  308. func gcd(x, y int) int {
  309. if y == 0 {
  310. return x
  311. }
  312. return gcd(y, x%y)
  313. }
  314. func (clus *Cluster) updateStresserChecker() {
  315. clus.lg.Info(
  316. "updating stressers",
  317. zap.Int("round", clus.rd),
  318. zap.Int("case", clus.cs),
  319. )
  320. cs := &compositeStresser{}
  321. for idx := range clus.Members {
  322. cs.stressers = append(cs.stressers, newStresser(clus, idx))
  323. }
  324. clus.stresser = cs
  325. if clus.Tester.ConsistencyCheck {
  326. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  327. if schk := cs.Checker(); schk != nil {
  328. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  329. }
  330. } else {
  331. clus.checker = newNoChecker()
  332. }
  333. clus.lg.Info(
  334. "updated stressers",
  335. zap.Int("round", clus.rd),
  336. zap.Int("case", clus.cs),
  337. )
  338. }
  339. func (clus *Cluster) startStresser() (err error) {
  340. clus.lg.Info(
  341. "starting stressers",
  342. zap.Int("round", clus.rd),
  343. zap.Int("case", clus.cs),
  344. )
  345. err = clus.stresser.Stress()
  346. clus.lg.Info(
  347. "started stressers",
  348. zap.Int("round", clus.rd),
  349. zap.Int("case", clus.cs),
  350. )
  351. return err
  352. }
  353. func (clus *Cluster) closeStresser() {
  354. clus.lg.Info(
  355. "closing stressers",
  356. zap.Int("round", clus.rd),
  357. zap.Int("case", clus.cs),
  358. )
  359. clus.stresser.Close()
  360. clus.lg.Info(
  361. "closed stressers",
  362. zap.Int("round", clus.rd),
  363. zap.Int("case", clus.cs),
  364. )
  365. }
  366. func (clus *Cluster) pauseStresser() {
  367. clus.lg.Info(
  368. "pausing stressers",
  369. zap.Int("round", clus.rd),
  370. zap.Int("case", clus.cs),
  371. )
  372. clus.stresser.Pause()
  373. clus.lg.Info(
  374. "paused stressers",
  375. zap.Int("round", clus.rd),
  376. zap.Int("case", clus.cs),
  377. )
  378. }
  379. func (clus *Cluster) checkConsistency() (err error) {
  380. defer func() {
  381. if err != nil {
  382. return
  383. }
  384. if err = clus.updateRevision(); err != nil {
  385. clus.lg.Warn(
  386. "updateRevision failed",
  387. zap.Error(err),
  388. )
  389. return
  390. }
  391. err = clus.startStresser()
  392. }()
  393. clus.lg.Info(
  394. "checking consistency and invariant of cluster",
  395. zap.Int("round", clus.rd),
  396. zap.Int("case", clus.cs),
  397. zap.String("desc", clus.failures[clus.cs].Desc()),
  398. )
  399. if err = clus.checker.Check(); err != nil {
  400. clus.lg.Warn(
  401. "checker.Check failed",
  402. zap.Error(err),
  403. )
  404. return err
  405. }
  406. clus.lg.Info(
  407. "checked consistency and invariant of cluster",
  408. zap.Int("round", clus.rd),
  409. zap.Int("case", clus.cs),
  410. zap.String("desc", clus.failures[clus.cs].Desc()),
  411. )
  412. return err
  413. }
  414. // Bootstrap bootstraps etcd cluster the very first time.
  415. // After this, just continue to call kill/restart.
  416. func (clus *Cluster) Bootstrap() error {
  417. // this is the only time that creates request from scratch
  418. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  419. }
  420. // FailArchive sends "FailArchive" operation.
  421. func (clus *Cluster) FailArchive() error {
  422. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  423. }
  424. // Restart sends "Restart" operation.
  425. func (clus *Cluster) Restart() error {
  426. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  427. }
  428. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  429. for i := range clus.agentStreams {
  430. err := clus.sendOperation(i, op)
  431. if err != nil {
  432. if op == rpcpb.Operation_DestroyEtcdAgent &&
  433. strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
  434. // agent server has already closed;
  435. // so this error is expected
  436. clus.lg.Info(
  437. "successfully destroyed",
  438. zap.String("member", clus.Members[i].EtcdClientEndpoint),
  439. )
  440. continue
  441. }
  442. return err
  443. }
  444. }
  445. return nil
  446. }
  447. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  448. if op == rpcpb.Operation_InitialStartEtcd {
  449. clus.agentRequests[idx] = &rpcpb.Request{
  450. Operation: op,
  451. Member: clus.Members[idx],
  452. Tester: clus.Tester,
  453. }
  454. } else {
  455. clus.agentRequests[idx].Operation = op
  456. }
  457. clus.lg.Info(
  458. "sending request",
  459. zap.String("operation", op.String()),
  460. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  461. )
  462. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  463. clus.lg.Info(
  464. "sent request",
  465. zap.String("operation", op.String()),
  466. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  467. zap.Error(err),
  468. )
  469. if err != nil {
  470. return err
  471. }
  472. clus.lg.Info(
  473. "receiving response",
  474. zap.String("operation", op.String()),
  475. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  476. )
  477. resp, err := clus.agentStreams[idx].Recv()
  478. if resp != nil {
  479. clus.lg.Info(
  480. "received response",
  481. zap.String("operation", op.String()),
  482. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  483. zap.Bool("success", resp.Success),
  484. zap.String("status", resp.Status),
  485. zap.Error(err),
  486. )
  487. } else {
  488. clus.lg.Info(
  489. "received empty response",
  490. zap.String("operation", op.String()),
  491. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  492. zap.Error(err),
  493. )
  494. }
  495. if err != nil {
  496. return err
  497. }
  498. if !resp.Success {
  499. err = errors.New(resp.Status)
  500. }
  501. return err
  502. }
  503. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  504. func (clus *Cluster) DestroyEtcdAgents() {
  505. clus.lg.Info("destroying etcd servers and agents")
  506. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  507. if err != nil {
  508. clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err))
  509. } else {
  510. clus.lg.Info("destroyed etcd servers and agents")
  511. }
  512. for i, conn := range clus.agentConns {
  513. clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
  514. err := conn.Close()
  515. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  516. }
  517. if clus.testerHTTPServer != nil {
  518. clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
  519. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  520. err := clus.testerHTTPServer.Shutdown(ctx)
  521. cancel()
  522. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
  523. }
  524. }
  525. // WaitHealth ensures all members are healthy
  526. // by writing a test key to etcd cluster.
  527. func (clus *Cluster) WaitHealth() error {
  528. var err error
  529. // wait 60s to check cluster health.
  530. // TODO: set it to a reasonable value. It is set that high because
  531. // follower may use long time to catch up the leader when reboot under
  532. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  533. for i := 0; i < 60; i++ {
  534. for _, m := range clus.Members {
  535. clus.lg.Info(
  536. "writing health key",
  537. zap.Int("retries", i),
  538. zap.String("endpoint", m.EtcdClientEndpoint),
  539. )
  540. if err = m.WriteHealthKey(); err != nil {
  541. clus.lg.Warn(
  542. "writing health key failed",
  543. zap.Int("retries", i),
  544. zap.String("endpoint", m.EtcdClientEndpoint),
  545. zap.Error(err),
  546. )
  547. break
  548. }
  549. clus.lg.Info(
  550. "wrote health key",
  551. zap.Int("retries", i),
  552. zap.String("endpoint", m.EtcdClientEndpoint),
  553. )
  554. }
  555. if err == nil {
  556. clus.lg.Info(
  557. "writing health key success on all members",
  558. zap.Int("retries", i),
  559. )
  560. return nil
  561. }
  562. time.Sleep(time.Second)
  563. }
  564. return err
  565. }
  566. // GetLeader returns the index of leader and error if any.
  567. func (clus *Cluster) GetLeader() (int, error) {
  568. for i, m := range clus.Members {
  569. isLeader, err := m.IsLeader()
  570. if isLeader || err != nil {
  571. return i, err
  572. }
  573. }
  574. return 0, fmt.Errorf("no leader found")
  575. }
  576. // maxRev returns the maximum revision found on the cluster.
  577. func (clus *Cluster) maxRev() (rev int64, err error) {
  578. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  579. defer cancel()
  580. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  581. for i := range clus.Members {
  582. go func(m *rpcpb.Member) {
  583. mrev, merr := m.Rev(ctx)
  584. revc <- mrev
  585. errc <- merr
  586. }(clus.Members[i])
  587. }
  588. for i := 0; i < len(clus.Members); i++ {
  589. if merr := <-errc; merr != nil {
  590. err = merr
  591. }
  592. if mrev := <-revc; mrev > rev {
  593. rev = mrev
  594. }
  595. }
  596. return rev, err
  597. }
  598. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  599. revs := make(map[string]int64)
  600. hashes := make(map[string]int64)
  601. for _, m := range clus.Members {
  602. rev, hash, err := m.RevHash()
  603. if err != nil {
  604. return nil, nil, err
  605. }
  606. revs[m.EtcdClientEndpoint] = rev
  607. hashes[m.EtcdClientEndpoint] = hash
  608. }
  609. return revs, hashes, nil
  610. }
  611. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  612. if rev <= 0 {
  613. return nil
  614. }
  615. for i, m := range clus.Members {
  616. conn, derr := m.DialEtcdGRPCServer()
  617. if derr != nil {
  618. clus.lg.Warn(
  619. "compactKV dial failed",
  620. zap.String("endpoint", m.EtcdClientEndpoint),
  621. zap.Error(derr),
  622. )
  623. err = derr
  624. continue
  625. }
  626. kvc := pb.NewKVClient(conn)
  627. clus.lg.Info(
  628. "compacting",
  629. zap.String("endpoint", m.EtcdClientEndpoint),
  630. zap.Int64("compact-revision", rev),
  631. zap.Duration("timeout", timeout),
  632. )
  633. now := time.Now()
  634. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  635. _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
  636. cancel()
  637. conn.Close()
  638. succeed := true
  639. if cerr != nil {
  640. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  641. clus.lg.Info(
  642. "compact error is ignored",
  643. zap.String("endpoint", m.EtcdClientEndpoint),
  644. zap.Int64("compact-revision", rev),
  645. zap.Error(cerr),
  646. )
  647. } else {
  648. clus.lg.Warn(
  649. "compact failed",
  650. zap.String("endpoint", m.EtcdClientEndpoint),
  651. zap.Int64("compact-revision", rev),
  652. zap.Error(cerr),
  653. )
  654. err = cerr
  655. succeed = false
  656. }
  657. }
  658. if succeed {
  659. clus.lg.Info(
  660. "compacted",
  661. zap.String("endpoint", m.EtcdClientEndpoint),
  662. zap.Int64("compact-revision", rev),
  663. zap.Duration("timeout", timeout),
  664. zap.Duration("took", time.Since(now)),
  665. )
  666. }
  667. }
  668. return err
  669. }
  670. func (clus *Cluster) checkCompact(rev int64) error {
  671. if rev == 0 {
  672. return nil
  673. }
  674. for _, m := range clus.Members {
  675. if err := m.CheckCompact(rev); err != nil {
  676. return err
  677. }
  678. }
  679. return nil
  680. }
  681. func (clus *Cluster) defrag() error {
  682. clus.lg.Info(
  683. "defragmenting",
  684. zap.Int("round", clus.rd),
  685. zap.Int("case", clus.cs),
  686. )
  687. for _, m := range clus.Members {
  688. if err := m.Defrag(); err != nil {
  689. clus.lg.Warn(
  690. "defrag failed",
  691. zap.Int("round", clus.rd),
  692. zap.Int("case", clus.cs),
  693. zap.Error(err),
  694. )
  695. return err
  696. }
  697. }
  698. clus.lg.Info(
  699. "defragmented",
  700. zap.Int("round", clus.rd),
  701. zap.Int("case", clus.cs),
  702. )
  703. return nil
  704. }
  705. func (clus *Cluster) Report() int64 { return clus.stresser.ModifiedKeys() }