cluster.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "math/rand"
  23. "net/http"
  24. "net/url"
  25. "path/filepath"
  26. "strings"
  27. "sync"
  28. "time"
  29. "go.etcd.io/etcd/functional/rpcpb"
  30. "go.etcd.io/etcd/pkg/debugutil"
  31. "go.etcd.io/etcd/pkg/fileutil"
  32. "github.com/prometheus/client_golang/prometheus/promhttp"
  33. "go.uber.org/zap"
  34. "golang.org/x/time/rate"
  35. "google.golang.org/grpc"
  36. )
  37. // Cluster defines tester cluster.
  38. type Cluster struct {
  39. lg *zap.Logger
  40. agentConns []*grpc.ClientConn
  41. agentClients []rpcpb.TransportClient
  42. agentStreams []rpcpb.Transport_TransportClient
  43. agentRequests []*rpcpb.Request
  44. testerHTTPServer *http.Server
  45. Members []*rpcpb.Member `yaml:"agent-configs"`
  46. Tester *rpcpb.Tester `yaml:"tester-config"`
  47. cases []Case
  48. rateLimiter *rate.Limiter
  49. stresser Stresser
  50. checkers []Checker
  51. currentRevision int64
  52. rd int
  53. cs int
  54. }
  55. var dialOpts = []grpc.DialOption{
  56. grpc.WithInsecure(),
  57. grpc.WithTimeout(5 * time.Second),
  58. grpc.WithBlock(),
  59. }
  60. // NewCluster creates a client from a tester configuration.
  61. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  62. clus, err := read(lg, fpath)
  63. if err != nil {
  64. return nil, err
  65. }
  66. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  67. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  68. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  69. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  70. clus.cases = make([]Case, 0)
  71. for i, ap := range clus.Members {
  72. var err error
  73. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  74. if err != nil {
  75. return nil, err
  76. }
  77. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  78. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  79. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  80. if err != nil {
  81. return nil, err
  82. }
  83. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  84. }
  85. mux := http.NewServeMux()
  86. mux.Handle("/metrics", promhttp.Handler())
  87. if clus.Tester.EnablePprof {
  88. for p, h := range debugutil.PProfHandlers() {
  89. mux.Handle(p, h)
  90. }
  91. }
  92. clus.testerHTTPServer = &http.Server{
  93. Addr: clus.Tester.Addr,
  94. Handler: mux,
  95. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  96. }
  97. go clus.serveTesterServer()
  98. clus.updateCases()
  99. clus.rateLimiter = rate.NewLimiter(
  100. rate.Limit(int(clus.Tester.StressQPS)),
  101. int(clus.Tester.StressQPS),
  102. )
  103. clus.setStresserChecker()
  104. return clus, nil
  105. }
  106. // EtcdClientEndpoints returns all etcd client endpoints.
  107. func (clus *Cluster) EtcdClientEndpoints() (css []string) {
  108. css = make([]string, len(clus.Members))
  109. for i := range clus.Members {
  110. css[i] = clus.Members[i].EtcdClientEndpoint
  111. }
  112. return css
  113. }
  114. func (clus *Cluster) serveTesterServer() {
  115. clus.lg.Info(
  116. "started tester HTTP server",
  117. zap.String("tester-address", clus.Tester.Addr),
  118. )
  119. err := clus.testerHTTPServer.ListenAndServe()
  120. clus.lg.Info(
  121. "tester HTTP server returned",
  122. zap.String("tester-address", clus.Tester.Addr),
  123. zap.Error(err),
  124. )
  125. if err != nil && err != http.ErrServerClosed {
  126. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  127. }
  128. }
  129. func (clus *Cluster) updateCases() {
  130. for _, cs := range clus.Tester.Cases {
  131. switch cs {
  132. case "SIGTERM_ONE_FOLLOWER":
  133. clus.cases = append(clus.cases,
  134. new_Case_SIGTERM_ONE_FOLLOWER(clus))
  135. case "SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  136. clus.cases = append(clus.cases,
  137. new_Case_SIGTERM_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
  138. case "SIGTERM_LEADER":
  139. clus.cases = append(clus.cases,
  140. new_Case_SIGTERM_LEADER(clus))
  141. case "SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  142. clus.cases = append(clus.cases,
  143. new_Case_SIGTERM_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
  144. case "SIGTERM_QUORUM":
  145. clus.cases = append(clus.cases,
  146. new_Case_SIGTERM_QUORUM(clus))
  147. case "SIGTERM_ALL":
  148. clus.cases = append(clus.cases,
  149. new_Case_SIGTERM_ALL(clus))
  150. case "SIGQUIT_AND_REMOVE_ONE_FOLLOWER":
  151. clus.cases = append(clus.cases,
  152. new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER(clus))
  153. case "SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  154. clus.cases = append(clus.cases,
  155. new_Case_SIGQUIT_AND_REMOVE_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus))
  156. case "SIGQUIT_AND_REMOVE_LEADER":
  157. clus.cases = append(clus.cases,
  158. new_Case_SIGQUIT_AND_REMOVE_LEADER(clus))
  159. case "SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  160. clus.cases = append(clus.cases,
  161. new_Case_SIGQUIT_AND_REMOVE_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus))
  162. case "SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH":
  163. clus.cases = append(clus.cases,
  164. new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus))
  165. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  166. clus.cases = append(clus.cases,
  167. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER(clus))
  168. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  169. clus.cases = append(clus.cases,
  170. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT())
  171. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  172. clus.cases = append(clus.cases,
  173. new_Case_BLACKHOLE_PEER_PORT_TX_RX_LEADER(clus))
  174. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  175. clus.cases = append(clus.cases,
  176. new_Case_BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT())
  177. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  178. clus.cases = append(clus.cases,
  179. new_Case_BLACKHOLE_PEER_PORT_TX_RX_QUORUM(clus))
  180. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  181. clus.cases = append(clus.cases,
  182. new_Case_BLACKHOLE_PEER_PORT_TX_RX_ALL(clus))
  183. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  184. clus.cases = append(clus.cases,
  185. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, false))
  186. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  187. clus.cases = append(clus.cases,
  188. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus, true))
  189. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  190. clus.cases = append(clus.cases,
  191. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
  192. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  193. clus.cases = append(clus.cases,
  194. new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
  195. case "DELAY_PEER_PORT_TX_RX_LEADER":
  196. clus.cases = append(clus.cases,
  197. new_Case_DELAY_PEER_PORT_TX_RX_LEADER(clus, false))
  198. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  199. clus.cases = append(clus.cases,
  200. new_Case_DELAY_PEER_PORT_TX_RX_LEADER(clus, true))
  201. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  202. clus.cases = append(clus.cases,
  203. new_Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, false))
  204. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  205. clus.cases = append(clus.cases,
  206. new_Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus, true))
  207. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  208. clus.cases = append(clus.cases,
  209. new_Case_DELAY_PEER_PORT_TX_RX_QUORUM(clus, false))
  210. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  211. clus.cases = append(clus.cases,
  212. new_Case_DELAY_PEER_PORT_TX_RX_QUORUM(clus, true))
  213. case "DELAY_PEER_PORT_TX_RX_ALL":
  214. clus.cases = append(clus.cases,
  215. new_Case_DELAY_PEER_PORT_TX_RX_ALL(clus, false))
  216. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  217. clus.cases = append(clus.cases,
  218. new_Case_DELAY_PEER_PORT_TX_RX_ALL(clus, true))
  219. case "NO_FAIL_WITH_STRESS":
  220. clus.cases = append(clus.cases,
  221. new_Case_NO_FAIL_WITH_STRESS(clus))
  222. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  223. clus.cases = append(clus.cases,
  224. new_Case_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS(clus))
  225. case "EXTERNAL":
  226. clus.cases = append(clus.cases,
  227. new_Case_EXTERNAL(clus.Tester.ExternalExecPath))
  228. case "FAILPOINTS":
  229. fpFailures, fperr := failpointFailures(clus)
  230. if len(fpFailures) == 0 {
  231. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  232. }
  233. clus.cases = append(clus.cases,
  234. fpFailures...)
  235. }
  236. }
  237. }
  238. func (clus *Cluster) listCases() (css []string) {
  239. css = make([]string, len(clus.cases))
  240. for i := range clus.cases {
  241. css[i] = clus.cases[i].Desc()
  242. }
  243. return css
  244. }
  245. // UpdateDelayLatencyMs updates delay latency with random value
  246. // within election timeout.
  247. func (clus *Cluster) UpdateDelayLatencyMs() {
  248. rand.Seed(time.Now().UnixNano())
  249. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  250. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  251. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  252. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  253. }
  254. }
  255. func (clus *Cluster) setStresserChecker() {
  256. css := &compositeStresser{}
  257. lss := []*leaseStresser{}
  258. rss := []*runnerStresser{}
  259. for _, m := range clus.Members {
  260. sss := newStresser(clus, m)
  261. css.stressers = append(css.stressers, &compositeStresser{sss})
  262. for _, s := range sss {
  263. if v, ok := s.(*leaseStresser); ok {
  264. lss = append(lss, v)
  265. clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
  266. }
  267. if v, ok := s.(*runnerStresser); ok {
  268. rss = append(rss, v)
  269. clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
  270. }
  271. }
  272. }
  273. clus.stresser = css
  274. for _, cs := range clus.Tester.Checkers {
  275. switch cs {
  276. case "KV_HASH":
  277. clus.checkers = append(clus.checkers, newKVHashChecker(clus))
  278. case "LEASE_EXPIRE":
  279. for _, ls := range lss {
  280. clus.checkers = append(clus.checkers, newLeaseExpireChecker(ls))
  281. }
  282. case "RUNNER":
  283. for _, rs := range rss {
  284. clus.checkers = append(clus.checkers, newRunnerChecker(rs.etcdClientEndpoint, rs.errc))
  285. }
  286. case "NO_CHECK":
  287. clus.checkers = append(clus.checkers, newNoChecker())
  288. }
  289. }
  290. clus.lg.Info("updated stressers")
  291. }
  292. func (clus *Cluster) runCheckers(exceptions ...rpcpb.Checker) (err error) {
  293. defer func() {
  294. if err != nil {
  295. return
  296. }
  297. if err = clus.updateRevision(); err != nil {
  298. clus.lg.Warn(
  299. "updateRevision failed",
  300. zap.Error(err),
  301. )
  302. return
  303. }
  304. }()
  305. exs := make(map[rpcpb.Checker]struct{})
  306. for _, e := range exceptions {
  307. exs[e] = struct{}{}
  308. }
  309. for _, chk := range clus.checkers {
  310. clus.lg.Warn(
  311. "consistency check START",
  312. zap.String("checker", chk.Type().String()),
  313. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  314. )
  315. err = chk.Check()
  316. clus.lg.Warn(
  317. "consistency check END",
  318. zap.String("checker", chk.Type().String()),
  319. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  320. zap.Error(err),
  321. )
  322. if err != nil {
  323. _, ok := exs[chk.Type()]
  324. if !ok {
  325. return err
  326. }
  327. clus.lg.Warn(
  328. "consistency check SKIP FAIL",
  329. zap.String("checker", chk.Type().String()),
  330. zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
  331. zap.Error(err),
  332. )
  333. }
  334. }
  335. return nil
  336. }
  337. // Send_INITIAL_START_ETCD bootstraps etcd cluster the very first time.
  338. // After this, just continue to call kill/restart.
  339. func (clus *Cluster) Send_INITIAL_START_ETCD() error {
  340. // this is the only time that creates request from scratch
  341. return clus.broadcast(rpcpb.Operation_INITIAL_START_ETCD)
  342. }
  343. // send_SIGQUIT_ETCD_AND_ARCHIVE_DATA sends "send_SIGQUIT_ETCD_AND_ARCHIVE_DATA" operation.
  344. func (clus *Cluster) send_SIGQUIT_ETCD_AND_ARCHIVE_DATA() error {
  345. return clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA)
  346. }
  347. // send_RESTART_ETCD sends restart operation.
  348. func (clus *Cluster) send_RESTART_ETCD() error {
  349. return clus.broadcast(rpcpb.Operation_RESTART_ETCD)
  350. }
  351. func (clus *Cluster) broadcast(op rpcpb.Operation) error {
  352. var wg sync.WaitGroup
  353. wg.Add(len(clus.agentStreams))
  354. errc := make(chan error, len(clus.agentStreams))
  355. for i := range clus.agentStreams {
  356. go func(idx int, o rpcpb.Operation) {
  357. defer wg.Done()
  358. errc <- clus.sendOp(idx, o)
  359. }(i, op)
  360. }
  361. wg.Wait()
  362. close(errc)
  363. errs := []string{}
  364. for err := range errc {
  365. if err == nil {
  366. continue
  367. }
  368. if err != nil {
  369. destroyed := false
  370. if op == rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT {
  371. if err == io.EOF {
  372. destroyed = true
  373. }
  374. if strings.Contains(err.Error(),
  375. "rpc error: code = Unavailable desc = transport is closing") {
  376. // agent server has already closed;
  377. // so this error is expected
  378. destroyed = true
  379. }
  380. if strings.Contains(err.Error(),
  381. "desc = os: process already finished") {
  382. destroyed = true
  383. }
  384. }
  385. if !destroyed {
  386. errs = append(errs, err.Error())
  387. }
  388. }
  389. }
  390. if len(errs) == 0 {
  391. return nil
  392. }
  393. return errors.New(strings.Join(errs, ", "))
  394. }
  395. func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
  396. _, err := clus.sendOpWithResp(idx, op)
  397. return err
  398. }
  399. func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) {
  400. // maintain the initial member object
  401. // throughout the test time
  402. clus.agentRequests[idx] = &rpcpb.Request{
  403. Operation: op,
  404. Member: clus.Members[idx],
  405. Tester: clus.Tester,
  406. }
  407. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  408. clus.lg.Info(
  409. "sent request",
  410. zap.String("operation", op.String()),
  411. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  412. zap.Error(err),
  413. )
  414. if err != nil {
  415. return nil, err
  416. }
  417. resp, err := clus.agentStreams[idx].Recv()
  418. if resp != nil {
  419. clus.lg.Info(
  420. "received response",
  421. zap.String("operation", op.String()),
  422. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  423. zap.Bool("success", resp.Success),
  424. zap.String("status", resp.Status),
  425. zap.Error(err),
  426. )
  427. } else {
  428. clus.lg.Info(
  429. "received empty response",
  430. zap.String("operation", op.String()),
  431. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  432. zap.Error(err),
  433. )
  434. }
  435. if err != nil {
  436. return nil, err
  437. }
  438. if !resp.Success {
  439. return nil, errors.New(resp.Status)
  440. }
  441. m, secure := clus.Members[idx], false
  442. for _, cu := range m.Etcd.AdvertiseClientURLs {
  443. u, perr := url.Parse(cu)
  444. if perr != nil {
  445. return nil, perr
  446. }
  447. if u.Scheme == "https" { // TODO: handle unix
  448. secure = true
  449. }
  450. }
  451. // store TLS assets from agents/servers onto disk
  452. if secure && (op == rpcpb.Operation_INITIAL_START_ETCD || op == rpcpb.Operation_RESTART_ETCD) {
  453. dirClient := filepath.Join(
  454. clus.Tester.DataDir,
  455. clus.Members[idx].Etcd.Name,
  456. "fixtures",
  457. "client",
  458. )
  459. if err = fileutil.TouchDirAll(dirClient); err != nil {
  460. return nil, err
  461. }
  462. clientCertData := []byte(resp.Member.ClientCertData)
  463. if len(clientCertData) == 0 {
  464. return nil, fmt.Errorf("got empty client cert from %q", m.EtcdClientEndpoint)
  465. }
  466. clientCertPath := filepath.Join(dirClient, "cert.pem")
  467. if err = ioutil.WriteFile(clientCertPath, clientCertData, 0644); err != nil { // overwrite if exists
  468. return nil, err
  469. }
  470. resp.Member.ClientCertPath = clientCertPath
  471. clus.lg.Info(
  472. "saved client cert file",
  473. zap.String("path", clientCertPath),
  474. )
  475. clientKeyData := []byte(resp.Member.ClientKeyData)
  476. if len(clientKeyData) == 0 {
  477. return nil, fmt.Errorf("got empty client key from %q", m.EtcdClientEndpoint)
  478. }
  479. clientKeyPath := filepath.Join(dirClient, "key.pem")
  480. if err = ioutil.WriteFile(clientKeyPath, clientKeyData, 0644); err != nil { // overwrite if exists
  481. return nil, err
  482. }
  483. resp.Member.ClientKeyPath = clientKeyPath
  484. clus.lg.Info(
  485. "saved client key file",
  486. zap.String("path", clientKeyPath),
  487. )
  488. clientTrustedCAData := []byte(resp.Member.ClientTrustedCAData)
  489. if len(clientTrustedCAData) != 0 {
  490. // TODO: disable this when auto TLS is deprecated
  491. clientTrustedCAPath := filepath.Join(dirClient, "ca.pem")
  492. if err = ioutil.WriteFile(clientTrustedCAPath, clientTrustedCAData, 0644); err != nil { // overwrite if exists
  493. return nil, err
  494. }
  495. resp.Member.ClientTrustedCAPath = clientTrustedCAPath
  496. clus.lg.Info(
  497. "saved client trusted CA file",
  498. zap.String("path", clientTrustedCAPath),
  499. )
  500. }
  501. // no need to store peer certs for tester clients
  502. clus.Members[idx] = resp.Member
  503. }
  504. return resp, nil
  505. }
  506. // Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT terminates all tester connections to agents and etcd servers.
  507. func (clus *Cluster) Send_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() {
  508. err := clus.broadcast(rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT)
  509. if err != nil {
  510. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  511. } else {
  512. clus.lg.Info("destroying etcd/agents PASS")
  513. }
  514. for i, conn := range clus.agentConns {
  515. err := conn.Close()
  516. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  517. }
  518. if clus.testerHTTPServer != nil {
  519. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  520. err := clus.testerHTTPServer.Shutdown(ctx)
  521. cancel()
  522. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.Addr), zap.Error(err))
  523. }
  524. }
  525. // WaitHealth ensures all members are healthy
  526. // by writing a test key to etcd cluster.
  527. func (clus *Cluster) WaitHealth() error {
  528. var err error
  529. // wait 60s to check cluster health.
  530. // TODO: set it to a reasonable value. It is set that high because
  531. // follower may use long time to catch up the leader when reboot under
  532. // reasonable workload (https://github.com/etcd-io/etcd/issues/2698)
  533. for i := 0; i < 60; i++ {
  534. for _, m := range clus.Members {
  535. if err = m.WriteHealthKey(); err != nil {
  536. clus.lg.Warn(
  537. "health check FAIL",
  538. zap.Int("retries", i),
  539. zap.String("endpoint", m.EtcdClientEndpoint),
  540. zap.Error(err),
  541. )
  542. break
  543. }
  544. clus.lg.Info(
  545. "health check PASS",
  546. zap.Int("retries", i),
  547. zap.String("endpoint", m.EtcdClientEndpoint),
  548. )
  549. }
  550. if err == nil {
  551. clus.lg.Info("health check ALL PASS")
  552. return nil
  553. }
  554. time.Sleep(time.Second)
  555. }
  556. return err
  557. }
  558. // GetLeader returns the index of leader and error if any.
  559. func (clus *Cluster) GetLeader() (int, error) {
  560. for i, m := range clus.Members {
  561. isLeader, err := m.IsLeader()
  562. if isLeader || err != nil {
  563. return i, err
  564. }
  565. }
  566. return 0, fmt.Errorf("no leader found")
  567. }
  568. // maxRev returns the maximum revision found on the cluster.
  569. func (clus *Cluster) maxRev() (rev int64, err error) {
  570. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  571. defer cancel()
  572. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  573. for i := range clus.Members {
  574. go func(m *rpcpb.Member) {
  575. mrev, merr := m.Rev(ctx)
  576. revc <- mrev
  577. errc <- merr
  578. }(clus.Members[i])
  579. }
  580. for i := 0; i < len(clus.Members); i++ {
  581. if merr := <-errc; merr != nil {
  582. err = merr
  583. }
  584. if mrev := <-revc; mrev > rev {
  585. rev = mrev
  586. }
  587. }
  588. return rev, err
  589. }
  590. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  591. revs := make(map[string]int64)
  592. hashes := make(map[string]int64)
  593. for _, m := range clus.Members {
  594. rev, hash, err := m.RevHash()
  595. if err != nil {
  596. return nil, nil, err
  597. }
  598. revs[m.EtcdClientEndpoint] = rev
  599. hashes[m.EtcdClientEndpoint] = hash
  600. }
  601. return revs, hashes, nil
  602. }
  603. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  604. if rev <= 0 {
  605. return nil
  606. }
  607. for i, m := range clus.Members {
  608. clus.lg.Info(
  609. "compact START",
  610. zap.String("endpoint", m.EtcdClientEndpoint),
  611. zap.Int64("compact-revision", rev),
  612. zap.Duration("timeout", timeout),
  613. )
  614. now := time.Now()
  615. cerr := m.Compact(rev, timeout)
  616. succeed := true
  617. if cerr != nil {
  618. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  619. clus.lg.Info(
  620. "compact error is ignored",
  621. zap.String("endpoint", m.EtcdClientEndpoint),
  622. zap.Int64("compact-revision", rev),
  623. zap.Error(cerr),
  624. )
  625. } else {
  626. clus.lg.Warn(
  627. "compact FAIL",
  628. zap.String("endpoint", m.EtcdClientEndpoint),
  629. zap.Int64("compact-revision", rev),
  630. zap.Error(cerr),
  631. )
  632. err = cerr
  633. succeed = false
  634. }
  635. }
  636. if succeed {
  637. clus.lg.Info(
  638. "compact PASS",
  639. zap.String("endpoint", m.EtcdClientEndpoint),
  640. zap.Int64("compact-revision", rev),
  641. zap.Duration("timeout", timeout),
  642. zap.Duration("took", time.Since(now)),
  643. )
  644. }
  645. }
  646. return err
  647. }
  648. func (clus *Cluster) checkCompact(rev int64) error {
  649. if rev == 0 {
  650. return nil
  651. }
  652. for _, m := range clus.Members {
  653. if err := m.CheckCompact(rev); err != nil {
  654. return err
  655. }
  656. }
  657. return nil
  658. }
  659. func (clus *Cluster) defrag() error {
  660. for _, m := range clus.Members {
  661. if err := m.Defrag(); err != nil {
  662. clus.lg.Warn(
  663. "defrag FAIL",
  664. zap.String("endpoint", m.EtcdClientEndpoint),
  665. zap.Error(err),
  666. )
  667. return err
  668. }
  669. clus.lg.Info(
  670. "defrag PASS",
  671. zap.String("endpoint", m.EtcdClientEndpoint),
  672. )
  673. }
  674. clus.lg.Info(
  675. "defrag ALL PASS",
  676. zap.Int("round", clus.rd),
  677. zap.Int("case", clus.cs),
  678. zap.Int("case-total", len(clus.cases)),
  679. )
  680. return nil
  681. }
  682. // GetCaseDelayDuration computes failure delay duration.
  683. func (clus *Cluster) GetCaseDelayDuration() time.Duration {
  684. return time.Duration(clus.Tester.CaseDelayMs) * time.Millisecond
  685. }
  686. // Report reports the number of modified keys.
  687. func (clus *Cluster) Report() int64 {
  688. return clus.stresser.ModifiedKeys()
  689. }