tester.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "sync"
  17. "time"
  18. )
  19. type tester struct {
  20. failures []failure
  21. cluster *cluster
  22. limit int
  23. status Status
  24. }
  25. func (tt *tester) runLoop() {
  26. tt.status.Since = time.Now()
  27. tt.status.RoundLimit = tt.limit
  28. tt.status.cluster = tt.cluster
  29. for _, f := range tt.failures {
  30. tt.status.Failures = append(tt.status.Failures, f.Desc())
  31. }
  32. for i := 0; i < tt.limit; i++ {
  33. tt.status.setRound(i)
  34. roundTotalCounter.Inc()
  35. var currentRevision int64
  36. for j, f := range tt.failures {
  37. caseTotalCounter.WithLabelValues(f.Desc()).Inc()
  38. tt.status.setCase(j)
  39. if err := tt.cluster.WaitHealth(); err != nil {
  40. plog.Printf("[round#%d case#%d] wait full health error: %v", i, j, err)
  41. if err := tt.cleanup(i, j); err != nil {
  42. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  43. return
  44. }
  45. continue
  46. }
  47. plog.Printf("[round#%d case#%d] start failure %s", i, j, f.Desc())
  48. plog.Printf("[round#%d case#%d] start injecting failure...", i, j)
  49. if err := f.Inject(tt.cluster, i); err != nil {
  50. plog.Printf("[round#%d case#%d] injection error: %v", i, j, err)
  51. if err := tt.cleanup(i, j); err != nil {
  52. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  53. return
  54. }
  55. continue
  56. }
  57. plog.Printf("[round#%d case#%d] injected failure", i, j)
  58. plog.Printf("[round#%d case#%d] start recovering failure...", i, j)
  59. if err := f.Recover(tt.cluster, i); err != nil {
  60. plog.Printf("[round#%d case#%d] recovery error: %v", i, j, err)
  61. if err := tt.cleanup(i, j); err != nil {
  62. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  63. return
  64. }
  65. continue
  66. }
  67. plog.Printf("[round#%d case#%d] recovered failure", i, j)
  68. if tt.cluster.v2Only {
  69. plog.Printf("[round#%d case#%d] succeed!", i, j)
  70. continue
  71. }
  72. plog.Printf("[round#%d case#%d] canceling the stressers...", i, j)
  73. for _, s := range tt.cluster.Stressers {
  74. s.Cancel()
  75. }
  76. plog.Printf("[round#%d case#%d] canceled stressers", i, j)
  77. plog.Printf("[round#%d case#%d] checking current revisions...", i, j)
  78. var (
  79. revs map[string]int64
  80. hashes map[string]int64
  81. rerr error
  82. ok bool
  83. )
  84. for k := 0; k < 5; k++ {
  85. time.Sleep(time.Second)
  86. revs, hashes, rerr = tt.cluster.getRevisionHash()
  87. if rerr != nil {
  88. plog.Printf("[round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, rerr)
  89. continue
  90. }
  91. if currentRevision, ok = getSameValue(revs); ok {
  92. break
  93. }
  94. plog.Printf("[round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
  95. }
  96. if !ok || rerr != nil {
  97. plog.Printf("[round#%d case#%d] checking current revisions failed (%v)", i, j, revs)
  98. if err := tt.cleanup(i, j); err != nil {
  99. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  100. return
  101. }
  102. continue
  103. }
  104. plog.Printf("[round#%d case#%d] all members are consistent with current revisions", i, j)
  105. plog.Printf("[round#%d case#%d] checking current storage hashes...", i, j)
  106. if _, ok = getSameValue(hashes); !ok {
  107. plog.Printf("[round#%d case#%d] checking current storage hashes failed (%v)", i, j, hashes)
  108. if err := tt.cleanup(i, j); err != nil {
  109. plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
  110. return
  111. }
  112. continue
  113. }
  114. plog.Printf("[round#%d case#%d] all members are consistent with storage hashes", i, j)
  115. plog.Printf("[round#%d case#%d] restarting the stressers...", i, j)
  116. for _, s := range tt.cluster.Stressers {
  117. go s.Stress()
  118. }
  119. plog.Printf("[round#%d case#%d] succeed!", i, j)
  120. }
  121. revToCompact := max(0, currentRevision-10000)
  122. plog.Printf("[round#%d] compacting storage at %d (current revision %d)", i, revToCompact, currentRevision)
  123. if err := tt.cluster.compactKV(revToCompact); err != nil {
  124. plog.Printf("[round#%d] compactKV error (%v)", i, err)
  125. if err := tt.cleanup(i, 0); err != nil {
  126. plog.Printf("[round#%d] cleanup error: %v", i, err)
  127. return
  128. }
  129. continue
  130. }
  131. plog.Printf("[round#%d] compacted storage", i)
  132. // TODO: make sure compaction is finished
  133. time.Sleep(30 * time.Second)
  134. }
  135. }
  136. func (tt *tester) cleanup(i, j int) error {
  137. roundFailedTotalCounter.Inc()
  138. caseFailedTotalCounter.WithLabelValues(tt.failures[j].Desc()).Inc()
  139. plog.Printf("[round#%d case#%d] cleaning up...", i, j)
  140. if err := tt.cluster.Cleanup(); err != nil {
  141. return err
  142. }
  143. return tt.cluster.Bootstrap()
  144. }
  145. type Status struct {
  146. Since time.Time
  147. Failures []string
  148. RoundLimit int
  149. Cluster ClusterStatus
  150. cluster *cluster
  151. mu sync.Mutex // guards Round and Case
  152. Round int
  153. Case int
  154. }
  155. // get gets a copy of status
  156. func (s *Status) get() Status {
  157. s.mu.Lock()
  158. got := *s
  159. cluster := s.cluster
  160. s.mu.Unlock()
  161. got.Cluster = cluster.Status()
  162. return got
  163. }
  164. func (s *Status) setRound(r int) {
  165. s.mu.Lock()
  166. defer s.mu.Unlock()
  167. s.Round = r
  168. }
  169. func (s *Status) setCase(c int) {
  170. s.mu.Lock()
  171. defer s.mu.Unlock()
  172. s.Case = c
  173. }