tester.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "log"
  17. "sync"
  18. "time"
  19. )
  20. type tester struct {
  21. failures []failure
  22. cluster *cluster
  23. limit int
  24. status Status
  25. }
  26. func (tt *tester) runLoop() {
  27. tt.status.Since = time.Now()
  28. tt.status.RoundLimit = tt.limit
  29. tt.status.cluster = tt.cluster
  30. for _, f := range tt.failures {
  31. tt.status.Failures = append(tt.status.Failures, f.Desc())
  32. }
  33. for i := 0; i < tt.limit; i++ {
  34. tt.status.setRound(i)
  35. var currentRevision int64
  36. for j, f := range tt.failures {
  37. tt.status.setCase(j)
  38. if err := tt.cluster.WaitHealth(); err != nil {
  39. log.Printf("etcd-tester: [round#%d case#%d] wait full health error: %v", i, j, err)
  40. if err := tt.cleanup(i, j); err != nil {
  41. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  42. return
  43. }
  44. continue
  45. }
  46. log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
  47. log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
  48. if err := f.Inject(tt.cluster, i); err != nil {
  49. log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
  50. if err := tt.cleanup(i, j); err != nil {
  51. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  52. return
  53. }
  54. continue
  55. }
  56. log.Printf("etcd-tester: [round#%d case#%d] injected failure", i, j)
  57. log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
  58. if err := f.Recover(tt.cluster, i); err != nil {
  59. log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
  60. if err := tt.cleanup(i, j); err != nil {
  61. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  62. return
  63. }
  64. continue
  65. }
  66. log.Printf("etcd-tester: [round#%d case#%d] recovered failure", i, j)
  67. if tt.cluster.v2Only {
  68. log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
  69. continue
  70. }
  71. log.Printf("etcd-tester: [round#%d case#%d] canceling the stressers...", i, j)
  72. for _, s := range tt.cluster.Stressers {
  73. s.Cancel()
  74. }
  75. log.Printf("etcd-tester: [round#%d case#%d] canceled stressers", i, j)
  76. log.Printf("etcd-tester: [round#%d case#%d] checking current revisions...", i, j)
  77. var (
  78. revs map[string]int64
  79. hashes map[string]int64
  80. rerr error
  81. ok bool
  82. )
  83. for k := 0; k < 5; k++ {
  84. time.Sleep(time.Second)
  85. revs, hashes, rerr = tt.cluster.getRevisionHash()
  86. if rerr != nil {
  87. log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, rerr)
  88. continue
  89. }
  90. if currentRevision, ok = getSameValue(revs); ok {
  91. break
  92. }
  93. log.Printf("etcd-tester: [round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
  94. }
  95. if !ok || rerr != nil {
  96. log.Printf("etcd-tester: [round#%d case#%d] checking current revisions failed (%v)", i, j, revs)
  97. if err := tt.cleanup(i, j); err != nil {
  98. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  99. return
  100. }
  101. continue
  102. }
  103. log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with current revisions", i, j)
  104. log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
  105. if _, ok = getSameValue(hashes); !ok {
  106. log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes failed (%v)", i, j, hashes)
  107. if err := tt.cleanup(i, j); err != nil {
  108. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  109. return
  110. }
  111. continue
  112. }
  113. log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with storage hashes", i, j)
  114. log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
  115. for _, s := range tt.cluster.Stressers {
  116. go s.Stress()
  117. }
  118. log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
  119. }
  120. revToCompact := max(0, currentRevision-10000)
  121. log.Printf("etcd-tester: [round#%d] compacting storage at %d (current revision %d)", i, revToCompact, currentRevision)
  122. if err := tt.cluster.compactKV(revToCompact); err != nil {
  123. log.Printf("etcd-tester: [round#%d] compactKV error (%v)", i, err)
  124. if err := tt.cleanup(i, 0); err != nil {
  125. log.Printf("etcd-tester: [round#%d] cleanup error: %v", i, err)
  126. return
  127. }
  128. continue
  129. }
  130. log.Printf("etcd-tester: [round#%d] compacted storage", i)
  131. // TODO: make sure compaction is finished
  132. time.Sleep(30 * time.Second)
  133. }
  134. }
  135. func (tt *tester) cleanup(i, j int) error {
  136. log.Printf("etcd-tester: [round#%d case#%d] cleaning up...", i, j)
  137. if err := tt.cluster.Cleanup(); err != nil {
  138. return err
  139. }
  140. return tt.cluster.Bootstrap()
  141. }
  142. type Status struct {
  143. Since time.Time
  144. Failures []string
  145. RoundLimit int
  146. Cluster ClusterStatus
  147. cluster *cluster
  148. mu sync.Mutex // guards Round and Case
  149. Round int
  150. Case int
  151. }
  152. // get gets a copy of status
  153. func (s *Status) get() Status {
  154. s.mu.Lock()
  155. got := *s
  156. cluster := s.cluster
  157. s.mu.Unlock()
  158. got.Cluster = cluster.Status()
  159. return got
  160. }
  161. func (s *Status) setRound(r int) {
  162. s.mu.Lock()
  163. defer s.mu.Unlock()
  164. s.Round = r
  165. }
  166. func (s *Status) setCase(c int) {
  167. s.mu.Lock()
  168. defer s.mu.Unlock()
  169. s.Case = c
  170. }