cluster.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "math/rand"
  22. "net/http"
  23. "net/url"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "time"
  28. "github.com/coreos/etcd/functional/rpcpb"
  29. "github.com/coreos/etcd/pkg/debugutil"
  30. "github.com/coreos/etcd/pkg/fileutil"
  31. "github.com/prometheus/client_golang/prometheus/promhttp"
  32. "go.uber.org/zap"
  33. "golang.org/x/time/rate"
  34. "google.golang.org/grpc"
  35. )
  36. // Cluster defines tester cluster.
  37. type Cluster struct {
  38. lg *zap.Logger
  39. agentConns []*grpc.ClientConn
  40. agentClients []rpcpb.TransportClient
  41. agentStreams []rpcpb.Transport_TransportClient
  42. agentRequests []*rpcpb.Request
  43. testerHTTPServer *http.Server
  44. Members []*rpcpb.Member `yaml:"agent-configs"`
  45. Tester *rpcpb.Tester `yaml:"tester-config"`
  46. cases []Case
  47. rateLimiter *rate.Limiter
  48. stresser Stresser
  49. checkers []Checker
  50. currentRevision int64
  51. rd int
  52. cs int
  53. }
  54. var dialOpts = []grpc.DialOption{
  55. grpc.WithInsecure(),
  56. grpc.WithTimeout(5 * time.Second),
  57. grpc.WithBlock(),
  58. }
  59. // NewCluster creates a client from a tester configuration.
  60. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  61. clus, err := read(lg, fpath)
  62. if err != nil {
  63. return nil, err
  64. }
  65. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  66. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  67. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  68. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  69. clus.cases = make([]Case, 0)
  70. for i, ap := range clus.Members {
  71. var err error
  72. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  73. if err != nil {
  74. return nil, err
  75. }
  76. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  77. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  78. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  79. if err != nil {
  80. return nil, err
  81. }
  82. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  83. }
  84. mux := http.NewServeMux()
  85. mux.Handle("/metrics", promhttp.Handler())
  86. if clus.Tester.EnablePprof {
  87. for p, h := range debugutil.PProfHandlers() {
  88. mux.Handle(p, h)
  89. }
  90. }
  91. clus.testerHTTPServer = &http.Server{
  92. Addr: clus.Tester.Addr,
  93. Handler: mux,
  94. }
  95. go clus.serveTesterServer()
  96. clus.updateCases()
  97. clus.rateLimiter = rate.NewLimiter(
  98. rate.Limit(int(clus.Tester.StressQPS)),
  99. int(clus.Tester.StressQPS),
  100. )
  101. clus.setStresserChecker()
  102. return clus, nil
  103. }
  104. // EtcdClientEndpoints returns all etcd client endpoints.
  105. func (clus *Cluster) EtcdClientEndpoints() (css []string) {
  106. css = make([]string, len(clus.Members))
  107. for i := range clus.Members {
  108. css[i] = clus.Members[i].EtcdClientEndpoint
  109. }
  110. return css
  111. }
  112. func (clus *Cluster) serveTesterServer() {
  113. clus.lg.Info(
  114. "started tester HTTP server",
  115. zap.String("tester-address", clus.Tester.Addr),
  116. )
  117. err := clus.testerHTTPServer.ListenAndServe()
  118. clus.lg.Info(
  119. "tester HTTP server returned",
  120. zap.String("tester-address", clus.Tester.Addr),
  121. zap.Error(err),
  122. )
  123. if err != nil && err != http.ErrServerClosed {
  124. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  125. }
  126. }
  127. func (clus *Cluster) updateCases() {
  128. for _, cs := range clus.Tester.Cases {
  129. switch cs {
  130. case "SIGTERM_ONE_FOLLOWER":
  131. clus.cases = append(clus.cases,
  132. new_Case_SIGTERM_ONE_FOLLOWER(clus))
  133. case "SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  134. clus.cases = append(clus.cases,
  135. new_Case_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
  136. case "SIGTERM_LEADER":
  137. clus.cases = append(clus.cases,
  138. new_Case_SIGTERM_LEADER(clus))
  139. case "SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  140. clus.cases = append(clus.cases,
  141. new_Case_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
  142. case "SIGTERM_QUORUM":
  143. clus.cases = append(clus.cases,
  144. new_Case_SIGTERM_QUORUM(clus))
  145. case "SIGTERM_ALL":
  146. clus.cases = append(clus.cases,
  147. new_Case_SIGTERM_ALL(clus))
  148. case "SIGQUIT_AND_REMOVE_ONE_FOLLOWER":
  149. clus.cases = append(clus.cases,
  150. new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER(clus))
  151. case "SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  152. clus.cases = append(clus.cases,
  153. new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
  154. case "SIGQUIT_AND_REMOVE_LEADER":
  155. clus.cases = append(clus.cases,
  156. new_Case_SIGQUIT_AND_REMOVE_LEADER(clus))
  157. case "SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  158. clus.cases = append(clus.cases,
  159. new_Case_SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
  160. case "SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH":
  161. clus.cases = append(clus.cases,
  162. new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus))
  163. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  164. clus.cases = append(clus.cases,
  165. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER(clus))
  166. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  167. clus.cases = append(clus.cases,
  168. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT())
  169. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  170. clus.cases = append(clus.cases,
  171. new_Case_BLACKHOLE_PEER_PORT_TX_RX_LEADER(clus))
  172. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  173. clus.cases = append(clus.cases,
  174. new_Case_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT())
  175. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  176. clus.cases = append(clus.cases,
  177. new_Case_BLACKHOLE_PEER_PORT_TX_RX_QUORUM(clus))
  178. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  179. clus.cases = append(clus.cases,
  180. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ALL(clus))
  181. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  182. clus.cases = append(clus.cases,
  183. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, false))
  184. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  185. clus.cases = append(clus.cases,
  186. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, true))
  187. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  188. clus.cases = append(clus.cases,
  189. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
  190. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  191. clus.cases = append(clus.cases,
  192. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
  193. case "DELAY_PEER_PORT_TX_RX_LEADER":
  194. clus.cases = append(clus.cases,
  195. new_Case_DELAY_PEER_PORT_TX_RX_LEADER(clus, false))
  196. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  197. clus.cases = append(clus.cases,
  198. new_Case_DELAY_PEER_PORT_TX_RX_LEADER(clus, true))
  199. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  200. clus.cases = append(clus.cases,
  201. new_Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
  202. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  203. clus.cases = append(clus.cases,
  204. new_Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
  205. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  206. clus.cases = append(clus.cases,
  207. new_Case_DELAY_PEER_PORT_TX_RX_QUORUM(clus, false))
  208. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  209. clus.cases = append(clus.cases,
  210. new_Case_DELAY_PEER_PORT_TX_RX_QUORUM(clus, true))
  211. case "DELAY_PEER_PORT_TX_RX_ALL":
  212. clus.cases = append(clus.cases,
  213. new_Case_DELAY_PEER_PORT_TX_RX_ALL(clus, false))
  214. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  215. clus.cases = append(clus.cases,
  216. new_Case_DELAY_PEER_PORT_TX_RX_ALL(clus, true))
  217. case "NO_FAIL_WITH_STRESS":
  218. clus.cases = append(clus.cases,
  219. new_Case_NO_FAIL_WITH_STRESS(clus))
  220. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  221. clus.cases = append(clus.cases,
  222. new_Case_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS(clus))
  223. case "EXTERNAL":
  224. clus.cases = append(clus.cases,
  225. new_Case_EXTERNAL(clus.Tester.ExternalExecPath))
  226. case "FAILPOINTS":
  227. fpFailures, fperr := failpointFailures(clus)
  228. if len(fpFailures) == 0 {
  229. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  230. }
  231. clus.cases = append(clus.cases,
  232. fpFailures...)
  233. }
  234. }
  235. }
  236. func (clus *Cluster) failureStrings() (fs []string) {
  237. fs = make([]string, len(clus.cases))
  238. for i := range clus.cases {
  239. fs[i] = clus.cases[i].Desc()
  240. }
  241. return fs
  242. }
  243. // UpdateDelayLatencyMs updates delay latency with random value
  244. // within election timeout.
  245. func (clus *Cluster) UpdateDelayLatencyMs() {
  246. rand.Seed(time.Now().UnixNano())
  247. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  248. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  249. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  250. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  251. }
  252. }
  253. func (clus *Cluster) setStresserChecker() {
  254. css := &compositeStresser{}
  255. lss := []*leaseStresser{}
  256. rss := []*runnerStresser{}
  257. for _, m := range clus.Members {
  258. sss := newStresser(clus, m)
  259. css.stressers = append(css.stressers, &compositeStresser{sss})
  260. for _, s := range sss {
  261. if v, ok := s.(*leaseStresser); ok {
  262. lss = append(lss, v)
  263. clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
  264. }
  265. if v, ok := s.(*runnerStresser); ok {
  266. rss = append(rss, v)
  267. clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
  268. }
  269. }
  270. }
  271. clus.stresser = css
  272. for _, cs := range clus.Tester.Checkers {
  273. switch cs {
  274. case "KV_HASH":
  275. clus.checkers = append(clus.checkers, newKVHashChecker(clus))
  276. case "LEASE_EXPIRE":
  277. for _, ls := range lss {
  278. clus.checkers = append(clus.checkers, newLeaseExpireChecker(ls))
  279. }
  280. case "RUNNER":
  281. for _, rs := range rss {
  282. clus.checkers = append(clus.checkers, newRunnerChecker(rs.etcdClientEndpoint, rs.errc))
  283. }
  284. case "NO_CHECK":
  285. clus.checkers = append(clus.checkers, newNoChecker())
  286. }
  287. }
  288. clus.lg.Info("updated stressers")
  289. }
  290. func (clus *Cluster) runCheckers(exceptions ...rpcpb.Checker) (err error) {
  291. defer func() {
  292. if err != nil {
  293. return
  294. }
  295. if err = clus.updateRevision(); err != nil {
  296. clus.lg.Warn(
  297. "updateRevision failed",
  298. zap.Error(err),
  299. )
  300. return
  301. }
  302. }()
  303. exs := make(map[rpcpb.Checker]struct{})
  304. for _, e := range exceptions {
  305. exs[e] = struct{}{}
  306. }
  307. for _, chk := range clus.checkers {
  308. clus.lg.Warn(
  309. "consistency check START",
  310. zap.String("checker", chk.Type().String()),
  311. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  312. )
  313. err = chk.Check()
  314. clus.lg.Warn(
  315. "consistency check END",
  316. zap.String("checker", chk.Type().String()),
  317. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  318. zap.Error(err),
  319. )
  320. if err != nil {
  321. _, ok := exs[chk.Type()]
  322. if !ok {
  323. return err
  324. }
  325. clus.lg.Warn(
  326. "consistency check SKIP FAIL",
  327. zap.String("checker", chk.Type().String()),
  328. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  329. zap.Error(err),
  330. )
  331. }
  332. }
  333. return nil
  334. }
  335. // Send_INITIAL_START_ETCD bootstraps etcd cluster the very first time.
  336. // After this, just continue to call kill/restart.
  337. func (clus *Cluster) Send_INITIAL_START_ETCD() error {
  338. // this is the only time that creates request from scratch
  339. return clus.broadcast(rpcpb.Operation_INITIAL_START_ETCD)
  340. }
  341. // send_SIGQUIT_ETCD_AND_ARCHIVE_DATA sends "send_SIGQUIT_ETCD_AND_ARCHIVE_DATA" operation.
  342. func (clus *Cluster) send_SIGQUIT_ETCD_AND_ARCHIVE_DATA() error {
  343. return clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA)
  344. }
  345. // send_RESTART_ETCD sends restart operation.
  346. func (clus *Cluster) send_RESTART_ETCD() error {
  347. return clus.broadcast(rpcpb.Operation_RESTART_ETCD)
  348. }
  349. func (clus *Cluster) broadcast(op rpcpb.Operation) error {
  350. var wg sync.WaitGroup
  351. wg.Add(len(clus.agentStreams))
  352. errc := make(chan error, len(clus.agentStreams))
  353. for i := range clus.agentStreams {
  354. go func(idx int, o rpcpb.Operation) {
  355. defer wg.Done()
  356. errc <- clus.sendOp(idx, o)
  357. }(i, op)
  358. }
  359. wg.Wait()
  360. close(errc)
  361. errs := []string{}
  362. for err := range errc {
  363. if err == nil {
  364. continue
  365. }
  366. if err != nil {
  367. destroyed := false
  368. if op == rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT {
  369. if err == io.EOF {
  370. destroyed = true
  371. }
  372. if strings.Contains(err.Error(),
  373. "rpc error: code = Unavailable desc = transport is closing") {
  374. // agent server has already closed;
  375. // so this error is expected
  376. destroyed = true
  377. }
  378. if strings.Contains(err.Error(),
  379. "desc = os: process already finished") {
  380. destroyed = true
  381. }
  382. }
  383. if !destroyed {
  384. errs = append(errs, err.Error())
  385. }
  386. }
  387. }
  388. if len(errs) == 0 {
  389. return nil
  390. }
  391. return errors.New(strings.Join(errs, ", "))
  392. }
  393. func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
  394. _, err := clus.sendOpWithResp(idx, op)
  395. return err
  396. }
  397. func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) {
  398. // maintain the initial member object
  399. // throughout the test time
  400. clus.agentRequests[idx] = &rpcpb.Request{
  401. Operation: op,
  402. Member: clus.Members[idx],
  403. Tester: clus.Tester,
  404. }
  405. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  406. clus.lg.Info(
  407. "sent request",
  408. zap.String("operation", op.String()),
  409. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  410. zap.Error(err),
  411. )
  412. if err != nil {
  413. return nil, err
  414. }
  415. resp, err := clus.agentStreams[idx].Recv()
  416. if resp != nil {
  417. clus.lg.Info(
  418. "received response",
  419. zap.String("operation", op.String()),
  420. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  421. zap.Bool("success", resp.Success),
  422. zap.String("status", resp.Status),
  423. zap.Error(err),
  424. )
  425. } else {
  426. clus.lg.Info(
  427. "received empty response",
  428. zap.String("operation", op.String()),
  429. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  430. zap.Error(err),
  431. )
  432. }
  433. if err != nil {
  434. return nil, err
  435. }
  436. if !resp.Success {
  437. return nil, errors.New(resp.Status)
  438. }
  439. m, secure := clus.Members[idx], false
  440. for _, cu := range m.Etcd.AdvertiseClientURLs {
  441. u, err := url.Parse(cu)
  442. if err != nil {
  443. return nil, err
  444. }
  445. if u.Scheme == "https" { // TODO: handle unix
  446. secure = true
  447. }
  448. }
  449. // store TLS assets from agents/servers onto disk
  450. if secure && (op == rpcpb.Operation_INITIAL_START_ETCD || op == rpcpb.Operation_RESTART_ETCD) {
  451. dirClient := filepath.Join(
  452. clus.Tester.DataDir,
  453. clus.Members[idx].Etcd.Name,
  454. "fixtures",
  455. "client",
  456. )
  457. if err = fileutil.TouchDirAll(dirClient); err != nil {
  458. return nil, err
  459. }
  460. clientCertData := []byte(resp.Member.ClientCertData)
  461. if len(clientCertData) == 0 {
  462. return nil, fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
  463. }
  464. clientCertPath := filepath.Join(dirClient, "cert.pem")
  465. if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
  466. return nil, err
  467. }
  468. resp.Member.ClientCertPath = clientCertPath
  469. clus.lg.Info(
  470. "saved client cert file",
  471. zap.String("path", clientCertPath),
  472. )
  473. clientKeyData := []byte(resp.Member.ClientKeyData)
  474. if len(clientKeyData) == 0 {
  475. return nil, fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
  476. }
  477. clientKeyPath := filepath.Join(dirClient, "key.pem")
  478. if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
  479. return nil, err
  480. }
  481. resp.Member.ClientKeyPath = clientKeyPath
  482. clus.lg.Info(
  483. "saved client key file",
  484. zap.String("path", clientKeyPath),
  485. )
  486. clientTrustedCAData := []byte(resp.Member.ClientTrustedCAData)
  487. if len(clientTrustedCAData) != 0 {
  488. // TODO: disable this when auto TLS is deprecated
  489. clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
  490. if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
  491. return nil, err
  492. }
  493. resp.Member.ClientTrustedCAPath = clientTrustedCAPath
  494. clus.lg.Info(
  495. "saved client trusted CA file",
  496. zap.String("path", clientTrustedCAPath),
  497. )
  498. }
  499. // no need to store peer certs for tester clients
  500. clus.Members[idx] = resp.Member
  501. }
  502. return resp, nil
  503. }
  504. // Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
  505. func (clus *Cluster) Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() {
  506. err := clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT)
  507. if err != nil {
  508. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  509. } else {
  510. clus.lg.Info("destroying etcd/agents PASS")
  511. }
  512. for i, conn := range clus.agentConns {
  513. err := conn.Close()
  514. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  515. }
  516. if clus.testerHTTPServer != nil {
  517. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  518. err := clus.testerHTTPServer.Shutdown(ctx)
  519. cancel()
  520. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
  521. }
  522. }
  523. // WaitHealth ensures all members are healthy
  524. // by writing a test key to etcd cluster.
  525. func (clus *Cluster) WaitHealth() error {
  526. var err error
  527. // wait 60s to check cluster health.
  528. // TODO: set it to a reasonable value. It is set that high because
  529. // follower may use long time to catch up the leader when reboot under
  530. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  531. for i := 0; i < 60; i++ {
  532. for _, m := range clus.Members {
  533. if err = m.WriteHealthKey(); err != nil {
  534. clus.lg.Warn(
  535. "health check FAIL",
  536. zap.Int("retries", i),
  537. zap.String("endpoint", m.EtcdClientEndpoint),
  538. zap.Error(err),
  539. )
  540. break
  541. }
  542. clus.lg.Info(
  543. "health check PASS",
  544. zap.Int("retries", i),
  545. zap.String("endpoint", m.EtcdClientEndpoint),
  546. )
  547. }
  548. if err == nil {
  549. clus.lg.Info("health check ALL PASS")
  550. return nil
  551. }
  552. time.Sleep(time.Second)
  553. }
  554. return err
  555. }
  556. // GetLeader returns the index of leader and error if any.
  557. func (clus *Cluster) GetLeader() (int, error) {
  558. for i, m := range clus.Members {
  559. isLeader, err := m.IsLeader()
  560. if isLeader || err != nil {
  561. return i, err
  562. }
  563. }
  564. return 0, fmt.Errorf("no leader found")
  565. }
  566. // maxRev returns the maximum revision found on the cluster.
  567. func (clus *Cluster) maxRev() (rev int64, err error) {
  568. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  569. defer cancel()
  570. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  571. for i := range clus.Members {
  572. go func(m *rpcpb.Member) {
  573. mrev, merr := m.Rev(ctx)
  574. revc <- mrev
  575. errc <- merr
  576. }(clus.Members[i])
  577. }
  578. for i := 0; i < len(clus.Members); i++ {
  579. if merr := <-errc; merr != nil {
  580. err = merr
  581. }
  582. if mrev := <-revc; mrev > rev {
  583. rev = mrev
  584. }
  585. }
  586. return rev, err
  587. }
  588. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  589. revs := make(map[string]int64)
  590. hashes := make(map[string]int64)
  591. for _, m := range clus.Members {
  592. rev, hash, err := m.RevHash()
  593. if err != nil {
  594. return nil, nil, err
  595. }
  596. revs[m.EtcdClientEndpoint] = rev
  597. hashes[m.EtcdClientEndpoint] = hash
  598. }
  599. return revs, hashes, nil
  600. }
  601. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  602. if rev <= 0 {
  603. return nil
  604. }
  605. for i, m := range clus.Members {
  606. clus.lg.Info(
  607. "compact START",
  608. zap.String("endpoint", m.EtcdClientEndpoint),
  609. zap.Int64("compact-revision", rev),
  610. zap.Duration("timeout", timeout),
  611. )
  612. now := time.Now()
  613. cerr := m.Compact(rev, timeout)
  614. succeed := true
  615. if cerr != nil {
  616. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  617. clus.lg.Info(
  618. "compact error is ignored",
  619. zap.String("endpoint", m.EtcdClientEndpoint),
  620. zap.Int64("compact-revision", rev),
  621. zap.Error(cerr),
  622. )
  623. } else {
  624. clus.lg.Warn(
  625. "compact FAIL",
  626. zap.String("endpoint", m.EtcdClientEndpoint),
  627. zap.Int64("compact-revision", rev),
  628. zap.Error(cerr),
  629. )
  630. err = cerr
  631. succeed = false
  632. }
  633. }
  634. if succeed {
  635. clus.lg.Info(
  636. "compact PASS",
  637. zap.String("endpoint", m.EtcdClientEndpoint),
  638. zap.Int64("compact-revision", rev),
  639. zap.Duration("timeout", timeout),
  640. zap.Duration("took", time.Since(now)),
  641. )
  642. }
  643. }
  644. return err
  645. }
  646. func (clus *Cluster) checkCompact(rev int64) error {
  647. if rev == 0 {
  648. return nil
  649. }
  650. for _, m := range clus.Members {
  651. if err := m.CheckCompact(rev); err != nil {
  652. return err
  653. }
  654. }
  655. return nil
  656. }
  657. func (clus *Cluster) defrag() error {
  658. for _, m := range clus.Members {
  659. if err := m.Defrag(); err != nil {
  660. clus.lg.Warn(
  661. "defrag FAIL",
  662. zap.String("endpoint", m.EtcdClientEndpoint),
  663. zap.Error(err),
  664. )
  665. return err
  666. }
  667. clus.lg.Info(
  668. "defrag PASS",
  669. zap.String("endpoint", m.EtcdClientEndpoint),
  670. )
  671. }
  672. clus.lg.Info(
  673. "defrag ALL PASS",
  674. zap.Int("round", clus.rd),
  675. zap.Int("case", clus.cs),
  676. zap.Int("case-total", len(clus.cases)),
  677. )
  678. return nil
  679. }
  680. // GetCaseDelayDuration computes failure delay duration.
  681. func (clus *Cluster) GetCaseDelayDuration() time.Duration {
  682. return time.Duration(clus.Tester.CaseDelayMs) * time.Millisecond
  683. }
  684. // Report reports the number of modified keys.
  685. func (clus *Cluster) Report() int64 {
  686. return clus.stresser.ModifiedKeys()
  687. }