cluster_run.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "os"
  18. "time"
  19. "github.com/coreos/etcd/functional/rpcpb"
  20. "github.com/coreos/etcd/pkg/fileutil"
  21. "go.uber.org/zap"
  22. )
  23. // compactQPS is rough number of compact requests per second.
  24. // Previous tests showed etcd can compact about 60,000 entries per second.
  25. const compactQPS = 50000
  26. // Run starts tester.
  27. func (clus *Cluster) Run() {
  28. defer printReport()
  29. if err := fileutil.TouchDirAll(clus.Tester.DataDir); err != nil {
  30. clus.lg.Panic(
  31. "failed to create test data directory",
  32. zap.String("dir", clus.Tester.DataDir),
  33. zap.Error(err),
  34. )
  35. }
  36. var preModifiedKey int64
  37. for round := 0; round < int(clus.Tester.RoundLimit) || clus.Tester.RoundLimit == -1; round++ {
  38. roundTotalCounter.Inc()
  39. clus.rd = round
  40. if err := clus.doRound(); err != nil {
  41. clus.lg.Warn(
  42. "round FAIL",
  43. zap.Int("round", clus.rd),
  44. zap.Int("case", clus.cs),
  45. zap.Error(err),
  46. )
  47. if clus.cleanup() != nil {
  48. return
  49. }
  50. // reset preModifiedKey after clean up
  51. preModifiedKey = 0
  52. continue
  53. }
  54. // -1 so that logPrefix doesn't print out 'case'
  55. clus.cs = -1
  56. revToCompact := max(0, clus.currentRevision-10000)
  57. currentModifiedKey := clus.stresser.ModifiedKeys()
  58. modifiedKey := currentModifiedKey - preModifiedKey
  59. preModifiedKey = currentModifiedKey
  60. timeout := 10 * time.Second
  61. timeout += time.Duration(modifiedKey/compactQPS) * time.Second
  62. clus.lg.Info(
  63. "compact START",
  64. zap.Int("round", clus.rd),
  65. zap.Int("case", clus.cs),
  66. zap.Duration("timeout", timeout),
  67. )
  68. if err := clus.compact(revToCompact, timeout); err != nil {
  69. clus.lg.Warn(
  70. "compact FAIL",
  71. zap.Int("round", clus.rd),
  72. zap.Int("case", clus.cs),
  73. zap.Error(err),
  74. )
  75. if err = clus.cleanup(); err != nil {
  76. clus.lg.Warn(
  77. "cleanup FAIL",
  78. zap.Int("round", clus.rd),
  79. zap.Int("case", clus.cs),
  80. zap.Error(err),
  81. )
  82. return
  83. }
  84. // reset preModifiedKey after clean up
  85. preModifiedKey = 0
  86. }
  87. if round > 0 && round%500 == 0 { // every 500 rounds
  88. if err := clus.defrag(); err != nil {
  89. clus.failed()
  90. return
  91. }
  92. }
  93. }
  94. clus.lg.Info(
  95. "functional-tester PASS",
  96. zap.Int("round", clus.rd),
  97. zap.Int("case", clus.cs),
  98. )
  99. }
  100. func (clus *Cluster) doRound() error {
  101. if clus.Tester.FailureShuffle {
  102. clus.shuffleFailures()
  103. }
  104. roundNow := time.Now()
  105. clus.lg.Info(
  106. "round START",
  107. zap.Int("round", clus.rd),
  108. zap.Strings("failures", clus.failureStrings()),
  109. zap.Int("total-failures", len(clus.failures)),
  110. )
  111. for i, fa := range clus.failures {
  112. clus.cs = i
  113. caseTotal[fa.Desc()]++
  114. caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
  115. caseNow := time.Now()
  116. clus.lg.Info(
  117. "case START",
  118. zap.Int("round", clus.rd),
  119. zap.Int("case", clus.cs),
  120. zap.String("desc", fa.Desc()),
  121. zap.Int("total-failures", len(clus.failures)),
  122. )
  123. clus.lg.Info("wait health before injecting failures")
  124. if err := clus.WaitHealth(); err != nil {
  125. return fmt.Errorf("wait full health error: %v", err)
  126. }
  127. stressStarted := false
  128. fcase := fa.FailureCase()
  129. if fcase != rpcpb.FailureCase_NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS {
  130. clus.lg.Info(
  131. "stresser START",
  132. zap.Int("round", clus.rd),
  133. zap.Int("case", clus.cs),
  134. zap.String("desc", fa.Desc()),
  135. )
  136. if err := clus.stresser.Stress(); err != nil {
  137. return fmt.Errorf("start stresser error: %v", err)
  138. }
  139. stressStarted = true
  140. }
  141. clus.lg.Info(
  142. "inject START",
  143. zap.Int("round", clus.rd),
  144. zap.Int("case", clus.cs),
  145. zap.String("desc", fa.Desc()),
  146. )
  147. if err := fa.Inject(clus); err != nil {
  148. return fmt.Errorf("injection error: %v", err)
  149. }
  150. // if run local, recovering server may conflict
  151. // with stressing client ports
  152. // TODO: use unix for local tests
  153. clus.lg.Info(
  154. "recover START",
  155. zap.Int("round", clus.rd),
  156. zap.Int("case", clus.cs),
  157. zap.String("desc", fa.Desc()),
  158. )
  159. if err := fa.Recover(clus); err != nil {
  160. return fmt.Errorf("recovery error: %v", err)
  161. }
  162. if stressStarted {
  163. clus.lg.Info("stresser PAUSE")
  164. ems := clus.stresser.Pause()
  165. if fcase == rpcpb.FailureCase_NO_FAIL_WITH_STRESS && len(ems) > 0 {
  166. ess := make([]string, 0, len(ems))
  167. cnt := 0
  168. for k, v := range ems {
  169. ess = append(ess, fmt.Sprintf("%s (count: %d)", k, v))
  170. cnt += v
  171. }
  172. clus.lg.Warn(
  173. "expected no errors",
  174. zap.String("desc", fa.Desc()),
  175. zap.Strings("errors", ess),
  176. )
  177. // with network delay, some ongoing requests may fail
  178. // only return error, if more than 10% of QPS requests fail
  179. if cnt > int(clus.Tester.StressQPS)/10 {
  180. return fmt.Errorf("expected no error in %q, got %q", fcase.String(), ess)
  181. }
  182. }
  183. }
  184. clus.lg.Info("health check START")
  185. if err := clus.WaitHealth(); err != nil {
  186. return fmt.Errorf("wait full health error: %v", err)
  187. }
  188. clus.lg.Info("consistency check START")
  189. if err := clus.checkConsistency(); err != nil {
  190. return fmt.Errorf("consistency check error (%v)", err)
  191. }
  192. clus.lg.Info(
  193. "case PASS",
  194. zap.Int("round", clus.rd),
  195. zap.Int("case", clus.cs),
  196. zap.String("desc", fa.Desc()),
  197. zap.Int("total-failures", len(clus.failures)),
  198. zap.Duration("took", time.Since(caseNow)),
  199. )
  200. }
  201. clus.lg.Info(
  202. "round ALL PASS",
  203. zap.Int("round", clus.rd),
  204. zap.Strings("failures", clus.failureStrings()),
  205. zap.Int("total-failures", len(clus.failures)),
  206. zap.Duration("took", time.Since(roundNow)),
  207. )
  208. return nil
  209. }
  210. func (clus *Cluster) updateRevision() error {
  211. revs, _, err := clus.getRevisionHash()
  212. for _, rev := range revs {
  213. clus.currentRevision = rev
  214. break // just need get one of the current revisions
  215. }
  216. clus.lg.Info(
  217. "updated current revision",
  218. zap.Int64("current-revision", clus.currentRevision),
  219. )
  220. return err
  221. }
  222. func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
  223. if err = clus.compactKV(rev, timeout); err != nil {
  224. clus.lg.Warn(
  225. "compact FAIL",
  226. zap.Int64("current-revision", clus.currentRevision),
  227. zap.Int64("compact-revision", rev),
  228. zap.Error(err),
  229. )
  230. return err
  231. }
  232. clus.lg.Info(
  233. "compact DONE",
  234. zap.Int64("current-revision", clus.currentRevision),
  235. zap.Int64("compact-revision", rev),
  236. )
  237. if err = clus.checkCompact(rev); err != nil {
  238. clus.lg.Warn(
  239. "check compact FAIL",
  240. zap.Int64("current-revision", clus.currentRevision),
  241. zap.Int64("compact-revision", rev),
  242. zap.Error(err),
  243. )
  244. return err
  245. }
  246. clus.lg.Info(
  247. "check compact DONE",
  248. zap.Int64("current-revision", clus.currentRevision),
  249. zap.Int64("compact-revision", rev),
  250. )
  251. return nil
  252. }
  253. func (clus *Cluster) failed() {
  254. if !clus.Tester.ExitOnFailure {
  255. return
  256. }
  257. clus.lg.Info(
  258. "functional-tester FAIL",
  259. zap.Int("round", clus.rd),
  260. zap.Int("case", clus.cs),
  261. )
  262. clus.DestroyEtcdAgents()
  263. os.Exit(2)
  264. }
  265. func (clus *Cluster) cleanup() error {
  266. defer clus.failed()
  267. roundFailedTotalCounter.Inc()
  268. desc := "compact/defrag"
  269. if clus.cs != -1 {
  270. desc = clus.failures[clus.cs].Desc()
  271. }
  272. caseFailedTotalCounter.WithLabelValues(desc).Inc()
  273. clus.lg.Info(
  274. "closing stressers before archiving failure data",
  275. zap.Int("round", clus.rd),
  276. zap.Int("case", clus.cs),
  277. )
  278. clus.stresser.Close()
  279. if err := clus.FailArchive(); err != nil {
  280. clus.lg.Warn(
  281. "cleanup FAIL",
  282. zap.Int("round", clus.rd),
  283. zap.Int("case", clus.cs),
  284. zap.Error(err),
  285. )
  286. return err
  287. }
  288. if err := clus.Restart(); err != nil {
  289. clus.lg.Warn(
  290. "restart FAIL",
  291. zap.Int("round", clus.rd),
  292. zap.Int("case", clus.cs),
  293. zap.Error(err),
  294. )
  295. return err
  296. }
  297. clus.updateStresserChecker()
  298. return nil
  299. }