cluster.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net/http"
  22. "path/filepath"
  23. "strings"
  24. "time"
  25. "github.com/coreos/etcd/pkg/debugutil"
  26. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  27. "github.com/prometheus/client_golang/prometheus/promhttp"
  28. "go.uber.org/zap"
  29. "golang.org/x/time/rate"
  30. "google.golang.org/grpc"
  31. yaml "gopkg.in/yaml.v2"
  32. )
  33. // Cluster defines tester cluster.
  34. type Cluster struct {
  35. lg *zap.Logger
  36. agentConns []*grpc.ClientConn
  37. agentClients []rpcpb.TransportClient
  38. agentStreams []rpcpb.Transport_TransportClient
  39. agentRequests []*rpcpb.Request
  40. testerHTTPServer *http.Server
  41. Members []*rpcpb.Member `yaml:"agent-configs"`
  42. Tester *rpcpb.Tester `yaml:"tester-config"`
  43. failures []Failure
  44. rateLimiter *rate.Limiter
  45. stresser Stresser
  46. checker Checker
  47. currentRevision int64
  48. rd int
  49. cs int
  50. }
  51. func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  52. bts, err := ioutil.ReadFile(fpath)
  53. if err != nil {
  54. return nil, err
  55. }
  56. lg.Info("opened configuration file", zap.String("path", fpath))
  57. clus := &Cluster{lg: lg}
  58. if err = yaml.Unmarshal(bts, clus); err != nil {
  59. return nil, err
  60. }
  61. for i := range clus.Members {
  62. if clus.Members[i].BaseDir == "" {
  63. return nil, fmt.Errorf("Members[i].BaseDir cannot be empty (got %q)", clus.Members[i].BaseDir)
  64. }
  65. if clus.Members[i].EtcdLogPath == "" {
  66. return nil, fmt.Errorf("Members[i].EtcdLogPath cannot be empty (got %q)", clus.Members[i].EtcdLogPath)
  67. }
  68. if clus.Members[i].Etcd.Name == "" {
  69. return nil, fmt.Errorf("'--name' cannot be empty (got %+v)", clus.Members[i])
  70. }
  71. if clus.Members[i].Etcd.DataDir == "" {
  72. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %+v)", clus.Members[i])
  73. }
  74. if clus.Members[i].Etcd.SnapshotCount == 0 {
  75. return nil, fmt.Errorf("'--snapshot-count' cannot be 0 (got %+v)", clus.Members[i].Etcd.SnapshotCount)
  76. }
  77. if clus.Members[i].Etcd.DataDir == "" {
  78. return nil, fmt.Errorf("'--data-dir' cannot be empty (got %q)", clus.Members[i].Etcd.DataDir)
  79. }
  80. if clus.Members[i].Etcd.WALDir == "" {
  81. clus.Members[i].Etcd.WALDir = filepath.Join(clus.Members[i].Etcd.DataDir, "member", "wal")
  82. }
  83. if clus.Members[i].Etcd.HeartbeatIntervalMs == 0 {
  84. return nil, fmt.Errorf("'--heartbeat-interval' cannot be 0 (got %+v)", clus.Members[i].Etcd)
  85. }
  86. if clus.Members[i].Etcd.ElectionTimeoutMs == 0 {
  87. return nil, fmt.Errorf("'--election-timeout' cannot be 0 (got %+v)", clus.Members[i].Etcd)
  88. }
  89. if int64(clus.Tester.DelayLatencyMs) <= clus.Members[i].Etcd.ElectionTimeoutMs {
  90. return nil, fmt.Errorf("delay latency %d ms must be greater than election timeout %d ms", clus.Tester.DelayLatencyMs, clus.Members[i].Etcd.ElectionTimeoutMs)
  91. }
  92. port := ""
  93. listenClientPorts := make([]string, len(clus.Members))
  94. for i, u := range clus.Members[i].Etcd.ListenClientURLs {
  95. if !isValidURL(u) {
  96. return nil, fmt.Errorf("'--listen-client-urls' has valid URL %q", u)
  97. }
  98. listenClientPorts[i], err = getPort(u)
  99. if err != nil {
  100. return nil, fmt.Errorf("'--listen-client-urls' has no port %q", u)
  101. }
  102. }
  103. for i, u := range clus.Members[i].Etcd.AdvertiseClientURLs {
  104. if !isValidURL(u) {
  105. return nil, fmt.Errorf("'--advertise-client-urls' has valid URL %q", u)
  106. }
  107. port, err = getPort(u)
  108. if err != nil {
  109. return nil, fmt.Errorf("'--advertise-client-urls' has no port %q", u)
  110. }
  111. if clus.Members[i].EtcdClientProxy && listenClientPorts[i] == port {
  112. return nil, fmt.Errorf("clus.Members[%d] requires client port proxy, but advertise port %q conflicts with listener port %q", i, port, listenClientPorts[i])
  113. }
  114. }
  115. listenPeerPorts := make([]string, len(clus.Members))
  116. for i, u := range clus.Members[i].Etcd.ListenPeerURLs {
  117. if !isValidURL(u) {
  118. return nil, fmt.Errorf("'--listen-peer-urls' has valid URL %q", u)
  119. }
  120. listenPeerPorts[i], err = getPort(u)
  121. if err != nil {
  122. return nil, fmt.Errorf("'--listen-peer-urls' has no port %q", u)
  123. }
  124. }
  125. for i, u := range clus.Members[i].Etcd.InitialAdvertisePeerURLs {
  126. if !isValidURL(u) {
  127. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has valid URL %q", u)
  128. }
  129. port, err = getPort(u)
  130. if err != nil {
  131. return nil, fmt.Errorf("'--initial-advertise-peer-urls' has no port %q", u)
  132. }
  133. if clus.Members[i].EtcdPeerProxy && listenPeerPorts[i] == port {
  134. return nil, fmt.Errorf("clus.Members[%d] requires peer port proxy, but advertise port %q conflicts with listener port %q", i, port, listenPeerPorts[i])
  135. }
  136. }
  137. if !strings.HasPrefix(clus.Members[i].EtcdLogPath, clus.Members[i].BaseDir) {
  138. return nil, fmt.Errorf("EtcdLogPath must be prefixed with BaseDir (got %q)", clus.Members[i].EtcdLogPath)
  139. }
  140. if !strings.HasPrefix(clus.Members[i].Etcd.DataDir, clus.Members[i].BaseDir) {
  141. return nil, fmt.Errorf("Etcd.DataDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.DataDir)
  142. }
  143. // TODO: support separate WALDir that can be handled via failure-archive
  144. if !strings.HasPrefix(clus.Members[i].Etcd.WALDir, clus.Members[i].BaseDir) {
  145. return nil, fmt.Errorf("Etcd.WALDir must be prefixed with BaseDir (got %q)", clus.Members[i].Etcd.WALDir)
  146. }
  147. if len(clus.Tester.FailureCases) == 0 {
  148. return nil, errors.New("FailureCases not found")
  149. }
  150. }
  151. if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 {
  152. return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv)
  153. }
  154. if clus.Tester.UpdatedDelayLatencyMs == 0 {
  155. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  156. }
  157. for _, v := range clus.Tester.FailureCases {
  158. if _, ok := rpcpb.FailureCase_value[v]; !ok {
  159. return nil, fmt.Errorf("%q is not defined in 'rpcpb.FailureCase_value'", v)
  160. }
  161. }
  162. for _, v := range clus.Tester.StressTypes {
  163. if _, ok := rpcpb.StressType_value[v]; !ok {
  164. return nil, fmt.Errorf("StressType is unknown; got %q", v)
  165. }
  166. }
  167. if clus.Tester.StressKeySuffixRangeTxn > 100 {
  168. return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn)
  169. }
  170. if clus.Tester.StressKeyTxnOps > 64 {
  171. return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps)
  172. }
  173. return clus, err
  174. }
  175. // TODO: status handler
  176. var dialOpts = []grpc.DialOption{
  177. grpc.WithInsecure(),
  178. grpc.WithTimeout(5 * time.Second),
  179. grpc.WithBlock(),
  180. }
  181. // NewCluster creates a client from a tester configuration.
  182. func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
  183. clus, err := newCluster(lg, fpath)
  184. if err != nil {
  185. return nil, err
  186. }
  187. clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
  188. clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
  189. clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
  190. clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
  191. clus.failures = make([]Failure, 0)
  192. for i, ap := range clus.Members {
  193. var err error
  194. clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
  195. if err != nil {
  196. return nil, err
  197. }
  198. clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
  199. clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
  200. clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
  201. if err != nil {
  202. return nil, err
  203. }
  204. clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
  205. }
  206. mux := http.NewServeMux()
  207. mux.Handle("/metrics", promhttp.Handler())
  208. if clus.Tester.EnablePprof {
  209. for p, h := range debugutil.PProfHandlers() {
  210. mux.Handle(p, h)
  211. }
  212. }
  213. clus.testerHTTPServer = &http.Server{
  214. Addr: clus.Tester.TesterAddr,
  215. Handler: mux,
  216. }
  217. go clus.serveTesterServer()
  218. clus.updateFailures()
  219. clus.rateLimiter = rate.NewLimiter(
  220. rate.Limit(int(clus.Tester.StressQPS)),
  221. int(clus.Tester.StressQPS),
  222. )
  223. clus.updateStresserChecker()
  224. return clus, nil
  225. }
  226. func (clus *Cluster) serveTesterServer() {
  227. clus.lg.Info(
  228. "started tester HTTP server",
  229. zap.String("tester-address", clus.Tester.TesterAddr),
  230. )
  231. err := clus.testerHTTPServer.ListenAndServe()
  232. clus.lg.Info(
  233. "tester HTTP server returned",
  234. zap.String("tester-address", clus.Tester.TesterAddr),
  235. zap.Error(err),
  236. )
  237. if err != nil && err != http.ErrServerClosed {
  238. clus.lg.Fatal("tester HTTP errored", zap.Error(err))
  239. }
  240. }
  241. func (clus *Cluster) updateFailures() {
  242. for _, cs := range clus.Tester.FailureCases {
  243. switch cs {
  244. case "KILL_ONE_FOLLOWER":
  245. clus.failures = append(clus.failures, newFailureKillOneFollower())
  246. case "KILL_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  247. clus.failures = append(clus.failures, newFailureKillOneFollowerUntilTriggerSnapshot())
  248. case "KILL_LEADER":
  249. clus.failures = append(clus.failures, newFailureKillLeader())
  250. case "KILL_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  251. clus.failures = append(clus.failures, newFailureKillLeaderUntilTriggerSnapshot())
  252. case "KILL_QUORUM":
  253. clus.failures = append(clus.failures, newFailureKillQuorum())
  254. case "KILL_ALL":
  255. clus.failures = append(clus.failures, newFailureKillAll())
  256. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
  257. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
  258. case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  259. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollowerUntilTriggerSnapshot())
  260. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
  261. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
  262. case "BLACKHOLE_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  263. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeaderUntilTriggerSnapshot())
  264. case "BLACKHOLE_PEER_PORT_TX_RX_QUORUM":
  265. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxQuorum(clus))
  266. case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
  267. clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
  268. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  269. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, false))
  270. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
  271. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus, true))
  272. case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  273. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, false))
  274. case "RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT":
  275. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollowerUntilTriggerSnapshot(clus, true))
  276. case "DELAY_PEER_PORT_TX_RX_LEADER":
  277. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, false))
  278. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER":
  279. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus, true))
  280. case "DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  281. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, false))
  282. case "RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT":
  283. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeaderUntilTriggerSnapshot(clus, true))
  284. case "DELAY_PEER_PORT_TX_RX_QUORUM":
  285. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, false))
  286. case "RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM":
  287. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxQuorum(clus, true))
  288. case "DELAY_PEER_PORT_TX_RX_ALL":
  289. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, false))
  290. case "RANDOM_DELAY_PEER_PORT_TX_RX_ALL":
  291. clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus, true))
  292. case "NO_FAIL_WITH_STRESS":
  293. clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
  294. case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
  295. clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
  296. case "EXTERNAL":
  297. clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
  298. case "FAILPOINTS":
  299. fpFailures, fperr := failpointFailures(clus)
  300. if len(fpFailures) == 0 {
  301. clus.lg.Info("no failpoints found!", zap.Error(fperr))
  302. }
  303. clus.failures = append(clus.failures, fpFailures...)
  304. }
  305. }
  306. }
  307. func (clus *Cluster) failureStrings() (fs []string) {
  308. fs = make([]string, len(clus.failures))
  309. for i := range clus.failures {
  310. fs[i] = clus.failures[i].Desc()
  311. }
  312. return fs
  313. }
  314. // UpdateDelayLatencyMs updates delay latency with random value
  315. // within election timeout.
  316. func (clus *Cluster) UpdateDelayLatencyMs() {
  317. rand.Seed(time.Now().UnixNano())
  318. clus.Tester.UpdatedDelayLatencyMs = uint32(rand.Int63n(clus.Members[0].Etcd.ElectionTimeoutMs))
  319. minLatRv := clus.Tester.DelayLatencyMsRv + clus.Tester.DelayLatencyMsRv/5
  320. if clus.Tester.UpdatedDelayLatencyMs <= minLatRv {
  321. clus.Tester.UpdatedDelayLatencyMs += minLatRv
  322. }
  323. }
  324. func (clus *Cluster) shuffleFailures() {
  325. rand.Seed(time.Now().UnixNano())
  326. offset := rand.Intn(1000)
  327. n := len(clus.failures)
  328. cp := coprime(n)
  329. fs := make([]Failure, n)
  330. for i := 0; i < n; i++ {
  331. fs[i] = clus.failures[(cp*i+offset)%n]
  332. }
  333. clus.failures = fs
  334. clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
  335. }
  336. /*
  337. x and y of GCD 1 are coprime to each other
  338. x1 = ( coprime of n * idx1 + offset ) % n
  339. x2 = ( coprime of n * idx2 + offset ) % n
  340. (x2 - x1) = coprime of n * (idx2 - idx1) % n
  341. = (idx2 - idx1) = 1
  342. Consecutive x's are guaranteed to be distinct
  343. */
  344. func coprime(n int) int {
  345. coprime := 1
  346. for i := n / 2; i < n; i++ {
  347. if gcd(i, n) == 1 {
  348. coprime = i
  349. break
  350. }
  351. }
  352. return coprime
  353. }
  354. func gcd(x, y int) int {
  355. if y == 0 {
  356. return x
  357. }
  358. return gcd(y, x%y)
  359. }
  360. func (clus *Cluster) updateStresserChecker() {
  361. cs := &compositeStresser{}
  362. for _, m := range clus.Members {
  363. cs.stressers = append(cs.stressers, newStresser(clus, m))
  364. }
  365. clus.stresser = cs
  366. if clus.Tester.ConsistencyCheck {
  367. clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
  368. if schk := cs.Checker(); schk != nil {
  369. clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
  370. }
  371. } else {
  372. clus.checker = newNoChecker()
  373. }
  374. clus.lg.Info(
  375. "updated stressers",
  376. zap.Int("round", clus.rd),
  377. zap.Int("case", clus.cs),
  378. )
  379. }
  380. func (clus *Cluster) checkConsistency() (err error) {
  381. defer func() {
  382. if err != nil {
  383. return
  384. }
  385. if err = clus.updateRevision(); err != nil {
  386. clus.lg.Warn(
  387. "updateRevision failed",
  388. zap.Error(err),
  389. )
  390. return
  391. }
  392. }()
  393. if err = clus.checker.Check(); err != nil {
  394. clus.lg.Warn(
  395. "consistency check FAIL",
  396. zap.Int("round", clus.rd),
  397. zap.Int("case", clus.cs),
  398. zap.Error(err),
  399. )
  400. return err
  401. }
  402. clus.lg.Info(
  403. "consistency check ALL PASS",
  404. zap.Int("round", clus.rd),
  405. zap.Int("case", clus.cs),
  406. zap.String("desc", clus.failures[clus.cs].Desc()),
  407. )
  408. return err
  409. }
  410. // Bootstrap bootstraps etcd cluster the very first time.
  411. // After this, just continue to call kill/restart.
  412. func (clus *Cluster) Bootstrap() error {
  413. // this is the only time that creates request from scratch
  414. return clus.broadcastOperation(rpcpb.Operation_InitialStartEtcd)
  415. }
  416. // FailArchive sends "FailArchive" operation.
  417. func (clus *Cluster) FailArchive() error {
  418. return clus.broadcastOperation(rpcpb.Operation_FailArchive)
  419. }
  420. // Restart sends "Restart" operation.
  421. func (clus *Cluster) Restart() error {
  422. return clus.broadcastOperation(rpcpb.Operation_RestartEtcd)
  423. }
  424. func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
  425. for i := range clus.agentStreams {
  426. err := clus.sendOperation(i, op)
  427. if err != nil {
  428. if op == rpcpb.Operation_DestroyEtcdAgent &&
  429. strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
  430. // agent server has already closed;
  431. // so this error is expected
  432. clus.lg.Info(
  433. "successfully destroyed",
  434. zap.String("member", clus.Members[i].EtcdClientEndpoint),
  435. )
  436. continue
  437. }
  438. return err
  439. }
  440. }
  441. return nil
  442. }
  443. func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
  444. if op == rpcpb.Operation_InitialStartEtcd {
  445. clus.agentRequests[idx] = &rpcpb.Request{
  446. Operation: op,
  447. Member: clus.Members[idx],
  448. Tester: clus.Tester,
  449. }
  450. } else {
  451. clus.agentRequests[idx].Operation = op
  452. }
  453. err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
  454. clus.lg.Info(
  455. "sent request",
  456. zap.String("operation", op.String()),
  457. zap.String("to", clus.Members[idx].EtcdClientEndpoint),
  458. zap.Error(err),
  459. )
  460. if err != nil {
  461. return err
  462. }
  463. resp, err := clus.agentStreams[idx].Recv()
  464. if resp != nil {
  465. clus.lg.Info(
  466. "received response",
  467. zap.String("operation", op.String()),
  468. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  469. zap.Bool("success", resp.Success),
  470. zap.String("status", resp.Status),
  471. zap.Error(err),
  472. )
  473. } else {
  474. clus.lg.Info(
  475. "received empty response",
  476. zap.String("operation", op.String()),
  477. zap.String("from", clus.Members[idx].EtcdClientEndpoint),
  478. zap.Error(err),
  479. )
  480. }
  481. if err != nil {
  482. return err
  483. }
  484. if !resp.Success {
  485. err = errors.New(resp.Status)
  486. }
  487. return err
  488. }
  489. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
  490. func (clus *Cluster) DestroyEtcdAgents() {
  491. err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
  492. if err != nil {
  493. clus.lg.Warn("destroying etcd/agents FAIL", zap.Error(err))
  494. } else {
  495. clus.lg.Info("destroying etcd/agents PASS")
  496. }
  497. for i, conn := range clus.agentConns {
  498. err := conn.Close()
  499. clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
  500. }
  501. if clus.testerHTTPServer != nil {
  502. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  503. err := clus.testerHTTPServer.Shutdown(ctx)
  504. cancel()
  505. clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
  506. }
  507. }
  508. // WaitHealth ensures all members are healthy
  509. // by writing a test key to etcd cluster.
  510. func (clus *Cluster) WaitHealth() error {
  511. var err error
  512. // wait 60s to check cluster health.
  513. // TODO: set it to a reasonable value. It is set that high because
  514. // follower may use long time to catch up the leader when reboot under
  515. // reasonable workload (https://github.com/coreos/etcd/issues/2698)
  516. for i := 0; i < 60; i++ {
  517. for _, m := range clus.Members {
  518. if err = m.WriteHealthKey(); err != nil {
  519. clus.lg.Warn(
  520. "health check FAIL",
  521. zap.Int("retries", i),
  522. zap.String("endpoint", m.EtcdClientEndpoint),
  523. zap.Error(err),
  524. )
  525. break
  526. }
  527. clus.lg.Info(
  528. "health check PASS",
  529. zap.Int("retries", i),
  530. zap.String("endpoint", m.EtcdClientEndpoint),
  531. )
  532. }
  533. if err == nil {
  534. clus.lg.Info(
  535. "health check ALL PASS",
  536. zap.Int("round", clus.rd),
  537. zap.Int("case", clus.cs),
  538. )
  539. return nil
  540. }
  541. time.Sleep(time.Second)
  542. }
  543. return err
  544. }
  545. // GetLeader returns the index of leader and error if any.
  546. func (clus *Cluster) GetLeader() (int, error) {
  547. for i, m := range clus.Members {
  548. isLeader, err := m.IsLeader()
  549. if isLeader || err != nil {
  550. return i, err
  551. }
  552. }
  553. return 0, fmt.Errorf("no leader found")
  554. }
  555. // maxRev returns the maximum revision found on the cluster.
  556. func (clus *Cluster) maxRev() (rev int64, err error) {
  557. ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
  558. defer cancel()
  559. revc, errc := make(chan int64, len(clus.Members)), make(chan error, len(clus.Members))
  560. for i := range clus.Members {
  561. go func(m *rpcpb.Member) {
  562. mrev, merr := m.Rev(ctx)
  563. revc <- mrev
  564. errc <- merr
  565. }(clus.Members[i])
  566. }
  567. for i := 0; i < len(clus.Members); i++ {
  568. if merr := <-errc; merr != nil {
  569. err = merr
  570. }
  571. if mrev := <-revc; mrev > rev {
  572. rev = mrev
  573. }
  574. }
  575. return rev, err
  576. }
  577. func (clus *Cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
  578. revs := make(map[string]int64)
  579. hashes := make(map[string]int64)
  580. for _, m := range clus.Members {
  581. rev, hash, err := m.RevHash()
  582. if err != nil {
  583. return nil, nil, err
  584. }
  585. revs[m.EtcdClientEndpoint] = rev
  586. hashes[m.EtcdClientEndpoint] = hash
  587. }
  588. return revs, hashes, nil
  589. }
  590. func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
  591. if rev <= 0 {
  592. return nil
  593. }
  594. for i, m := range clus.Members {
  595. clus.lg.Info(
  596. "compact START",
  597. zap.String("endpoint", m.EtcdClientEndpoint),
  598. zap.Int64("compact-revision", rev),
  599. zap.Duration("timeout", timeout),
  600. )
  601. now := time.Now()
  602. cerr := m.Compact(rev, timeout)
  603. succeed := true
  604. if cerr != nil {
  605. if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
  606. clus.lg.Info(
  607. "compact error is ignored",
  608. zap.String("endpoint", m.EtcdClientEndpoint),
  609. zap.Int64("compact-revision", rev),
  610. zap.Error(cerr),
  611. )
  612. } else {
  613. clus.lg.Warn(
  614. "compact FAIL",
  615. zap.String("endpoint", m.EtcdClientEndpoint),
  616. zap.Int64("compact-revision", rev),
  617. zap.Error(cerr),
  618. )
  619. err = cerr
  620. succeed = false
  621. }
  622. }
  623. if succeed {
  624. clus.lg.Info(
  625. "compact PASS",
  626. zap.String("endpoint", m.EtcdClientEndpoint),
  627. zap.Int64("compact-revision", rev),
  628. zap.Duration("timeout", timeout),
  629. zap.Duration("took", time.Since(now)),
  630. )
  631. }
  632. }
  633. return err
  634. }
  635. func (clus *Cluster) checkCompact(rev int64) error {
  636. if rev == 0 {
  637. return nil
  638. }
  639. for _, m := range clus.Members {
  640. if err := m.CheckCompact(rev); err != nil {
  641. return err
  642. }
  643. }
  644. return nil
  645. }
  646. func (clus *Cluster) defrag() error {
  647. for _, m := range clus.Members {
  648. if err := m.Defrag(); err != nil {
  649. clus.lg.Warn(
  650. "defrag FAIL",
  651. zap.String("endpoint", m.EtcdClientEndpoint),
  652. zap.Error(err),
  653. )
  654. return err
  655. }
  656. clus.lg.Info(
  657. "defrag PASS",
  658. zap.String("endpoint", m.EtcdClientEndpoint),
  659. )
  660. }
  661. clus.lg.Info(
  662. "defrag ALL PASS",
  663. zap.Int("round", clus.rd),
  664. zap.Int("case", clus.cs),
  665. )
  666. return nil
  667. }
  668. // GetFailureDelayDuration computes failure delay duration.
  669. func (clus *Cluster) GetFailureDelayDuration() time.Duration {
  670. return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
  671. }
  672. // Report reports the number of modified keys.
  673. func (clus *Cluster) Report() int64 {
  674. return clus.stresser.ModifiedKeys()
  675. }