tester.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "time"
  18. )
  19. type tester struct {
  20. cluster *cluster
  21. limit int
  22. failures []failure
  23. status Status
  24. currentRevision int64
  25. stresserType string
  26. scfg stressConfig
  27. doChecks bool
  28. stresser Stresser
  29. checker Checker
  30. }
  31. // compactQPS is rough number of compact requests per second.
  32. // Previous tests showed etcd can compact about 60,000 entries per second.
  33. const compactQPS = 50000
  34. func (tt *tester) runLoop() {
  35. tt.status.Since = time.Now()
  36. tt.status.RoundLimit = tt.limit
  37. tt.status.cluster = tt.cluster
  38. for _, f := range tt.failures {
  39. tt.status.Failures = append(tt.status.Failures, f.Desc())
  40. }
  41. if err := tt.resetStressCheck(); err != nil {
  42. plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
  43. return
  44. }
  45. var preModifiedKey int64
  46. for round := 0; round < tt.limit || tt.limit == -1; round++ {
  47. tt.status.setRound(round)
  48. roundTotalCounter.Inc()
  49. if err := tt.doRound(round); err != nil {
  50. plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
  51. if tt.cleanup() != nil {
  52. return
  53. }
  54. // reset preModifiedKey after clean up
  55. preModifiedKey = 0
  56. continue
  57. }
  58. // -1 so that logPrefix doesn't print out 'case'
  59. tt.status.setCase(-1)
  60. revToCompact := max(0, tt.currentRevision-10000)
  61. currentModifiedKey := tt.stresser.ModifiedKeys()
  62. modifiedKey := currentModifiedKey - preModifiedKey
  63. preModifiedKey = currentModifiedKey
  64. timeout := 10 * time.Second
  65. timeout += time.Duration(modifiedKey/compactQPS) * time.Second
  66. plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
  67. if err := tt.compact(revToCompact, timeout); err != nil {
  68. plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
  69. if tt.cleanup() != nil {
  70. return
  71. }
  72. // reset preModifiedKey after clean up
  73. preModifiedKey = 0
  74. }
  75. if round > 0 && round%500 == 0 { // every 500 rounds
  76. if err := tt.defrag(); err != nil {
  77. plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
  78. return
  79. }
  80. }
  81. }
  82. plog.Infof("%s functional-tester is finished", tt.logPrefix())
  83. }
  84. func (tt *tester) doRound(round int) error {
  85. for j, f := range tt.failures {
  86. caseTotalCounter.WithLabelValues(f.Desc()).Inc()
  87. tt.status.setCase(j)
  88. if err := tt.cluster.WaitHealth(); err != nil {
  89. return fmt.Errorf("wait full health error: %v", err)
  90. }
  91. plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc())
  92. if err := f.Inject(tt.cluster, round); err != nil {
  93. return fmt.Errorf("injection error: %v", err)
  94. }
  95. plog.Infof("%s injected failure", tt.logPrefix())
  96. plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc())
  97. if err := f.Recover(tt.cluster, round); err != nil {
  98. return fmt.Errorf("recovery error: %v", err)
  99. }
  100. plog.Infof("%s recovered failure", tt.logPrefix())
  101. tt.pauseStresser()
  102. plog.Infof("%s wait until cluster is healthy", tt.logPrefix())
  103. if err := tt.cluster.WaitHealth(); err != nil {
  104. return fmt.Errorf("wait full health error: %v", err)
  105. }
  106. plog.Infof("%s cluster is healthy", tt.logPrefix())
  107. plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix())
  108. if err := tt.checkConsistency(); err != nil {
  109. return fmt.Errorf("tt.checkConsistency error (%v)", err)
  110. }
  111. plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix())
  112. plog.Infof("%s succeed!", tt.logPrefix())
  113. }
  114. return nil
  115. }
  116. func (tt *tester) updateRevision() error {
  117. revs, _, err := tt.cluster.getRevisionHash()
  118. for _, rev := range revs {
  119. tt.currentRevision = rev
  120. break // just need get one of the current revisions
  121. }
  122. plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision)
  123. return err
  124. }
  125. func (tt *tester) checkConsistency() (err error) {
  126. defer func() {
  127. if err != nil {
  128. return
  129. }
  130. if err = tt.updateRevision(); err != nil {
  131. plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
  132. return
  133. }
  134. err = tt.startStresser()
  135. }()
  136. if err = tt.checker.Check(); err != nil {
  137. plog.Infof("%s %v", tt.logPrefix(), err)
  138. }
  139. return err
  140. }
  141. func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
  142. tt.pauseStresser()
  143. defer func() {
  144. if err == nil {
  145. err = tt.startStresser()
  146. }
  147. }()
  148. plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev)
  149. if err = tt.cluster.compactKV(rev, timeout); err != nil {
  150. return err
  151. }
  152. plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev)
  153. plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev)
  154. if err = tt.cluster.checkCompact(rev); err != nil {
  155. plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err)
  156. return err
  157. }
  158. plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev)
  159. return nil
  160. }
  161. func (tt *tester) defrag() error {
  162. plog.Infof("%s defragmenting...", tt.logPrefix())
  163. if err := tt.cluster.defrag(); err != nil {
  164. plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err)
  165. if cerr := tt.cleanup(); cerr != nil {
  166. return fmt.Errorf("%s, %s", err, cerr)
  167. }
  168. return err
  169. }
  170. plog.Infof("%s defragmented...", tt.logPrefix())
  171. return nil
  172. }
  173. func (tt *tester) logPrefix() string {
  174. var (
  175. rd = tt.status.getRound()
  176. cs = tt.status.getCase()
  177. prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
  178. )
  179. if cs == -1 {
  180. prefix = fmt.Sprintf("[round#%d]", rd)
  181. }
  182. return prefix
  183. }
  184. func (tt *tester) cleanup() error {
  185. roundFailedTotalCounter.Inc()
  186. desc := "compact/defrag"
  187. if tt.status.Case != -1 {
  188. desc = tt.failures[tt.status.Case].Desc()
  189. }
  190. caseFailedTotalCounter.WithLabelValues(desc).Inc()
  191. tt.closeStresser()
  192. if err := tt.cluster.Cleanup(); err != nil {
  193. plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
  194. return err
  195. }
  196. if err := tt.cluster.Reset(); err != nil {
  197. plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
  198. return err
  199. }
  200. return tt.resetStressCheck()
  201. }
  202. func (tt *tester) pauseStresser() {
  203. plog.Infof("%s pausing the stressers...", tt.logPrefix())
  204. tt.stresser.Pause()
  205. plog.Infof("%s paused stressers", tt.logPrefix())
  206. }
  207. func (tt *tester) startStresser() (err error) {
  208. plog.Infof("%s starting the stressers...", tt.logPrefix())
  209. err = tt.stresser.Stress()
  210. plog.Infof("%s started stressers", tt.logPrefix())
  211. return err
  212. }
  213. func (tt *tester) closeStresser() {
  214. plog.Infof("%s closing the stressers...", tt.logPrefix())
  215. tt.stresser.Close()
  216. plog.Infof("%s closed stressers", tt.logPrefix())
  217. }
  218. func (tt *tester) resetStressCheck() error {
  219. plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
  220. cs := &compositeStresser{}
  221. for _, m := range tt.cluster.Members {
  222. s := NewStresser(tt.stresserType, &tt.scfg, m)
  223. cs.stressers = append(cs.stressers, s)
  224. }
  225. tt.stresser = cs
  226. if !tt.doChecks {
  227. tt.checker = newNoChecker()
  228. return tt.startStresser()
  229. }
  230. chk := newHashChecker(hashAndRevGetter(tt.cluster))
  231. if schk := cs.Checker(); schk != nil {
  232. chk = newCompositeChecker([]Checker{chk, schk})
  233. }
  234. tt.checker = chk
  235. return tt.startStresser()
  236. }
  237. func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() }