tester.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "sync"
  17. "time"
  18. )
  19. type tester struct {
  20. failures []failure
  21. cluster *cluster
  22. limit int
  23. status Status
  24. }
  25. func (tt *tester) runLoop() {
  26. tt.status.Since = time.Now()
  27. tt.status.RoundLimit = tt.limit
  28. tt.status.cluster = tt.cluster
  29. for _, f := range tt.failures {
  30. tt.status.Failures = append(tt.status.Failures, f.Desc())
  31. }
  32. for i := 0; i < tt.limit; i++ {
  33. tt.status.setRound(i)
  34. roundTotalCounter.Inc()
  35. var (
  36. currentRevision int64
  37. success bool
  38. )
  39. for j, f := range tt.failures {
  40. caseTotalCounter.WithLabelValues(f.Desc()).Inc()
  41. tt.status.setCase(j)
  42. if err := tt.cluster.WaitHealth(); err != nil {
  43. plog.Printf("[round#%d case#%d] wait full health error: %v", i, j, err)
  44. if err := tt.cleanup(i, j); err != nil {
  45. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  46. return
  47. }
  48. break
  49. }
  50. plog.Printf("[round#%d case#%d] start failure %s", i, j, f.Desc())
  51. plog.Printf("[round#%d case#%d] start injecting failure...", i, j)
  52. if err := f.Inject(tt.cluster, i); err != nil {
  53. plog.Printf("[round#%d case#%d] injection error: %v", i, j, err)
  54. if err := tt.cleanup(i, j); err != nil {
  55. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  56. return
  57. }
  58. break
  59. }
  60. plog.Printf("[round#%d case#%d] injected failure", i, j)
  61. plog.Printf("[round#%d case#%d] start recovering failure...", i, j)
  62. if err := f.Recover(tt.cluster, i); err != nil {
  63. plog.Printf("[round#%d case#%d] recovery error: %v", i, j, err)
  64. if err := tt.cleanup(i, j); err != nil {
  65. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  66. return
  67. }
  68. break
  69. }
  70. plog.Printf("[round#%d case#%d] recovered failure", i, j)
  71. if tt.cluster.v2Only {
  72. plog.Printf("[round#%d case#%d] succeed!", i, j)
  73. continue
  74. }
  75. plog.Printf("[round#%d case#%d] canceling the stressers...", i, j)
  76. for _, s := range tt.cluster.Stressers {
  77. s.Cancel()
  78. }
  79. plog.Printf("[round#%d case#%d] canceled stressers", i, j)
  80. plog.Printf("[round#%d case#%d] checking current revisions...", i, j)
  81. var (
  82. revs map[string]int64
  83. hashes map[string]int64
  84. rerr error
  85. ok bool
  86. )
  87. for k := 0; k < 5; k++ {
  88. time.Sleep(time.Second)
  89. revs, hashes, rerr = tt.cluster.getRevisionHash()
  90. if rerr != nil {
  91. plog.Printf("[round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, rerr)
  92. continue
  93. }
  94. if currentRevision, ok = getSameValue(revs); ok {
  95. break
  96. }
  97. plog.Printf("[round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
  98. }
  99. if !ok || rerr != nil {
  100. plog.Printf("[round#%d case#%d] checking current revisions failed [revisions: %v]", i, j, revs)
  101. if err := tt.cleanup(i, j); err != nil {
  102. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  103. return
  104. }
  105. break
  106. }
  107. plog.Printf("[round#%d case#%d] all members are consistent with current revisions [revisions: %v]", i, j, revs)
  108. plog.Printf("[round#%d case#%d] checking current storage hashes...", i, j)
  109. if _, ok = getSameValue(hashes); !ok {
  110. plog.Printf("[round#%d case#%d] checking current storage hashes failed [hashes: %v]", i, j, hashes)
  111. if err := tt.cleanup(i, j); err != nil {
  112. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  113. return
  114. }
  115. break
  116. }
  117. plog.Printf("[round#%d case#%d] all members are consistent with storage hashes", i, j)
  118. plog.Printf("[round#%d case#%d] restarting the stressers...", i, j)
  119. for _, s := range tt.cluster.Stressers {
  120. go s.Stress()
  121. }
  122. plog.Printf("[round#%d case#%d] succeed!", i, j)
  123. success = true
  124. }
  125. if !success {
  126. continue
  127. }
  128. revToCompact := max(0, currentRevision-10000)
  129. plog.Printf("[round#%d] compacting storage at %d (current revision %d)", i, revToCompact, currentRevision)
  130. if err := tt.cluster.compactKV(revToCompact); err != nil {
  131. plog.Printf("[round#%d] compactKV error (%v)", i, err)
  132. if err := tt.cleanup(i, 0); err != nil {
  133. plog.Printf("[round#%d] cleanup error: %v", i, err)
  134. return
  135. }
  136. continue
  137. }
  138. plog.Printf("[round#%d] compacted storage", i)
  139. plog.Printf("[round#%d] check compaction at %d", i, revToCompact)
  140. if err := tt.cluster.checkCompact(revToCompact); err != nil {
  141. plog.Printf("[round#%d] checkCompact error (%v)", i, err)
  142. if err := tt.cleanup(i, 0); err != nil {
  143. plog.Printf("[round#%d] cleanup error: %v", i, err)
  144. return
  145. }
  146. }
  147. plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact)
  148. if i > 0 && i%500 == 0 { // every 500 rounds
  149. plog.Printf("[round#%d] canceling the stressers...", i)
  150. for _, s := range tt.cluster.Stressers {
  151. s.Cancel()
  152. }
  153. plog.Printf("[round#%d] canceled stressers", i)
  154. plog.Printf("[round#%d] deframenting...", i)
  155. if err := tt.cluster.defrag(); err != nil {
  156. plog.Printf("[round#%d] defrag error (%v)", i, err)
  157. if err := tt.cleanup(i, 0); err != nil {
  158. plog.Printf("[round#%d] cleanup error: %v", i, err)
  159. return
  160. }
  161. }
  162. plog.Printf("[round#%d] deframented...", i)
  163. plog.Printf("[round#%d] restarting the stressers...", i)
  164. for _, s := range tt.cluster.Stressers {
  165. go s.Stress()
  166. }
  167. }
  168. }
  169. }
  170. func (tt *tester) cleanup(i, j int) error {
  171. roundFailedTotalCounter.Inc()
  172. caseFailedTotalCounter.WithLabelValues(tt.failures[j].Desc()).Inc()
  173. plog.Printf("[round#%d case#%d] cleaning up...", i, j)
  174. if err := tt.cluster.Cleanup(); err != nil {
  175. return err
  176. }
  177. return tt.cluster.Bootstrap()
  178. }
  179. type Status struct {
  180. Since time.Time
  181. Failures []string
  182. RoundLimit int
  183. Cluster ClusterStatus
  184. cluster *cluster
  185. mu sync.Mutex // guards Round and Case
  186. Round int
  187. Case int
  188. }
  189. func (s *Status) setRound(r int) {
  190. s.mu.Lock()
  191. defer s.mu.Unlock()
  192. s.Round = r
  193. }
  194. func (s *Status) setCase(c int) {
  195. s.mu.Lock()
  196. defer s.mu.Unlock()
  197. s.Case = c
  198. }