tester.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "os"
  18. "time"
  19. )
  20. type tester struct {
  21. cluster *cluster
  22. limit int
  23. exitOnFailure bool
  24. failures []failure
  25. status Status
  26. currentRevision int64
  27. stresserType string
  28. scfg stressConfig
  29. doChecks bool
  30. stresser Stresser
  31. checker Checker
  32. }
  33. // compactQPS is rough number of compact requests per second.
  34. // Previous tests showed etcd can compact about 60,000 entries per second.
  35. const compactQPS = 50000
  36. func (tt *tester) runLoop() {
  37. tt.status.Since = time.Now()
  38. tt.status.RoundLimit = tt.limit
  39. tt.status.cluster = tt.cluster
  40. for _, f := range tt.failures {
  41. tt.status.Failures = append(tt.status.Failures, f.Desc())
  42. }
  43. if err := tt.resetStressCheck(); err != nil {
  44. plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
  45. tt.failed()
  46. return
  47. }
  48. var preModifiedKey int64
  49. for round := 0; round < tt.limit || tt.limit == -1; round++ {
  50. tt.status.setRound(round)
  51. roundTotalCounter.Inc()
  52. if err := tt.doRound(round); err != nil {
  53. plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
  54. if tt.cleanup() != nil {
  55. return
  56. }
  57. // reset preModifiedKey after clean up
  58. preModifiedKey = 0
  59. continue
  60. }
  61. // -1 so that logPrefix doesn't print out 'case'
  62. tt.status.setCase(-1)
  63. revToCompact := max(0, tt.currentRevision-10000)
  64. currentModifiedKey := tt.stresser.ModifiedKeys()
  65. modifiedKey := currentModifiedKey - preModifiedKey
  66. preModifiedKey = currentModifiedKey
  67. timeout := 10 * time.Second
  68. timeout += time.Duration(modifiedKey/compactQPS) * time.Second
  69. plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
  70. if err := tt.compact(revToCompact, timeout); err != nil {
  71. plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
  72. if tt.cleanup() != nil {
  73. return
  74. }
  75. // reset preModifiedKey after clean up
  76. preModifiedKey = 0
  77. }
  78. if round > 0 && round%500 == 0 { // every 500 rounds
  79. if err := tt.defrag(); err != nil {
  80. plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
  81. tt.failed()
  82. return
  83. }
  84. }
  85. }
  86. plog.Infof("%s functional-tester is finished", tt.logPrefix())
  87. }
  88. func (tt *tester) doRound(round int) error {
  89. for j, f := range tt.failures {
  90. caseTotalCounter.WithLabelValues(f.Desc()).Inc()
  91. tt.status.setCase(j)
  92. if err := tt.cluster.WaitHealth(); err != nil {
  93. return fmt.Errorf("wait full health error: %v", err)
  94. }
  95. plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc())
  96. if err := f.Inject(tt.cluster, round); err != nil {
  97. return fmt.Errorf("injection error: %v", err)
  98. }
  99. plog.Infof("%s injected failure", tt.logPrefix())
  100. plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc())
  101. if err := f.Recover(tt.cluster, round); err != nil {
  102. return fmt.Errorf("recovery error: %v", err)
  103. }
  104. plog.Infof("%s recovered failure", tt.logPrefix())
  105. tt.pauseStresser()
  106. plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
  107. if err := tt.cluster.WaitHealth(); err != nil {
  108. return fmt.Errorf("wait full health error: %v", err)
  109. }
  110. plog.Infof("%s cluster is healthy", tt.logPrefix())
  111. plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix())
  112. if err := tt.checkConsistency(); err != nil {
  113. return fmt.Errorf("tt.checkConsistency error (%v)", err)
  114. }
  115. plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix())
  116. plog.Infof("%s succeed!", tt.logPrefix())
  117. }
  118. return nil
  119. }
  120. func (tt *tester) updateRevision() error {
  121. revs, _, err := tt.cluster.getRevisionHash()
  122. for _, rev := range revs {
  123. tt.currentRevision = rev
  124. break // just need get one of the current revisions
  125. }
  126. plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision)
  127. return err
  128. }
  129. func (tt *tester) checkConsistency() (err error) {
  130. defer func() {
  131. if err != nil {
  132. return
  133. }
  134. if err = tt.updateRevision(); err != nil {
  135. plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
  136. return
  137. }
  138. err = tt.startStresser()
  139. }()
  140. if err = tt.checker.Check(); err != nil {
  141. plog.Infof("%s %v", tt.logPrefix(), err)
  142. }
  143. return err
  144. }
  145. func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
  146. tt.pauseStresser()
  147. defer func() {
  148. if err == nil {
  149. err = tt.startStresser()
  150. }
  151. }()
  152. plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
  153. if err = tt.cluster.compactKV(rev, timeout); err != nil {
  154. return err
  155. }
  156. plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
  157. plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
  158. if err = tt.cluster.checkCompact(rev); err != nil {
  159. plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
  160. return err
  161. }
  162. plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
  163. return nil
  164. }
  165. func (tt *tester) defrag() error {
  166. plog.Infof("%s defragmenting...", tt.logPrefix())
  167. if err := tt.cluster.defrag(); err != nil {
  168. plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err)
  169. if cerr := tt.cleanup(); cerr != nil {
  170. return fmt.Errorf("%s, %s", err, cerr)
  171. }
  172. return err
  173. }
  174. plog.Infof("%s defragmented...", tt.logPrefix())
  175. return nil
  176. }
  177. func (tt *tester) logPrefix() string {
  178. var (
  179. rd = tt.status.getRound()
  180. cs = tt.status.getCase()
  181. prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
  182. )
  183. if cs == -1 {
  184. prefix = fmt.Sprintf("[round#%d]", rd)
  185. }
  186. return prefix
  187. }
  188. func (tt *tester) failed() {
  189. if !tt.exitOnFailure {
  190. return
  191. }
  192. plog.Warningf("%s exiting on failure", tt.logPrefix())
  193. tt.cluster.Terminate()
  194. os.Exit(2)
  195. }
  196. func (tt *tester) cleanup() error {
  197. defer tt.failed()
  198. roundFailedTotalCounter.Inc()
  199. desc := "compact/defrag"
  200. if tt.status.Case != -1 {
  201. desc = tt.failures[tt.status.Case].Desc()
  202. }
  203. caseFailedTotalCounter.WithLabelValues(desc).Inc()
  204. tt.closeStresser()
  205. if err := tt.cluster.Cleanup(); err != nil {
  206. plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
  207. return err
  208. }
  209. if err := tt.cluster.Reset(); err != nil {
  210. plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
  211. return err
  212. }
  213. return tt.resetStressCheck()
  214. }
  215. func (tt *tester) pauseStresser() {
  216. plog.Infof("%s pausing the stressers...", tt.logPrefix())
  217. tt.stresser.Pause()
  218. plog.Infof("%s paused stressers", tt.logPrefix())
  219. }
  220. func (tt *tester) startStresser() (err error) {
  221. plog.Infof("%s starting the stressers...", tt.logPrefix())
  222. err = tt.stresser.Stress()
  223. plog.Infof("%s started stressers", tt.logPrefix())
  224. return err
  225. }
  226. func (tt *tester) closeStresser() {
  227. plog.Infof("%s closing the stressers...", tt.logPrefix())
  228. tt.stresser.Close()
  229. plog.Infof("%s closed stressers", tt.logPrefix())
  230. }
  231. func (tt *tester) resetStressCheck() error {
  232. plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
  233. cs := &compositeStresser{}
  234. for _, m := range tt.cluster.Members {
  235. s := NewStresser(tt.stresserType, &tt.scfg, m)
  236. cs.stressers = append(cs.stressers, s)
  237. }
  238. tt.stresser = cs
  239. if !tt.doChecks {
  240. tt.checker = newNoChecker()
  241. return tt.startStresser()
  242. }
  243. chk := newHashChecker(hashAndRevGetter(tt.cluster))
  244. if schk := cs.Checker(); schk != nil {
  245. chk = newCompositeChecker([]Checker{chk, schk})
  246. }
  247. tt.checker = chk
  248. return tt.startStresser()
  249. }
  250. func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() }