tester.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "log"
  18. "sync"
  19. "time"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  21. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. )
  24. type tester struct {
  25. failures []failure
  26. cluster *cluster
  27. limit int
  28. status Status
  29. }
  30. func (tt *tester) runLoop() {
  31. tt.status.Since = time.Now()
  32. tt.status.RoundLimit = tt.limit
  33. tt.status.cluster = tt.cluster
  34. for _, f := range tt.failures {
  35. tt.status.Failures = append(tt.status.Failures, f.Desc())
  36. }
  37. for i := 0; i < tt.limit; i++ {
  38. tt.status.setRound(i)
  39. for j, f := range tt.failures {
  40. tt.status.setCase(j)
  41. if err := tt.cluster.WaitHealth(); err != nil {
  42. log.Printf("etcd-tester: [round#%d case#%d] wait full health error: %v", i, j, err)
  43. if err := tt.cleanup(i, j); err != nil {
  44. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  45. return
  46. }
  47. continue
  48. }
  49. log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
  50. log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
  51. if err := f.Inject(tt.cluster, i); err != nil {
  52. log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
  53. if err := tt.cleanup(i, j); err != nil {
  54. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  55. return
  56. }
  57. continue
  58. }
  59. log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
  60. if err := f.Recover(tt.cluster, i); err != nil {
  61. log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
  62. if err := tt.cleanup(i, j); err != nil {
  63. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  64. return
  65. }
  66. continue
  67. }
  68. if tt.cluster.v2Only {
  69. log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
  70. continue
  71. }
  72. log.Printf("etcd-tester: [round#%d case#%d] canceling the stressers...", i, j)
  73. for _, s := range tt.cluster.Stressers {
  74. s.Cancel()
  75. }
  76. log.Printf("etcd-tester: [round#%d case#%d] waiting 5s for pending PUTs to be committed across cluster...", i, j)
  77. time.Sleep(5 * time.Second)
  78. log.Printf("etcd-tester: [round#%d case#%d] starting checking consistency...", i, j)
  79. err := tt.cluster.checkConsistency()
  80. if err != nil {
  81. log.Printf("etcd-tester: [round#%d case#%d] checkConsistency error (%v)", i, j, err)
  82. if err := tt.cleanup(i, j); err != nil {
  83. log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
  84. return
  85. }
  86. } else {
  87. log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
  88. log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
  89. }
  90. log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
  91. for _, s := range tt.cluster.Stressers {
  92. go s.Stress()
  93. }
  94. }
  95. }
  96. }
  97. func (tt *tester) cleanup(i, j int) error {
  98. log.Printf("etcd-tester: [round#%d case#%d] cleaning up...", i, j)
  99. if err := tt.cluster.Cleanup(); err != nil {
  100. return err
  101. }
  102. return tt.cluster.Bootstrap()
  103. }
  104. type Status struct {
  105. Since time.Time
  106. Failures []string
  107. RoundLimit int
  108. Cluster ClusterStatus
  109. cluster *cluster
  110. mu sync.Mutex // guards Round and Case
  111. Round int
  112. Case int
  113. }
  114. // get gets a copy of status
  115. func (s *Status) get() Status {
  116. s.mu.Lock()
  117. got := *s
  118. cluster := s.cluster
  119. s.mu.Unlock()
  120. got.Cluster = cluster.Status()
  121. return got
  122. }
  123. func (s *Status) setRound(r int) {
  124. s.mu.Lock()
  125. defer s.mu.Unlock()
  126. s.Round = r
  127. }
  128. func (s *Status) setCase(c int) {
  129. s.mu.Lock()
  130. defer s.mu.Unlock()
  131. s.Case = c
  132. }
  133. // checkConsistency stops the cluster for a moment and get the hashes of KV storages.
  134. func (c *cluster) checkConsistency() error {
  135. hashes := make(map[string]uint32)
  136. for _, u := range c.GRPCURLs {
  137. conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
  138. if err != nil {
  139. return err
  140. }
  141. kvc := pb.NewKVClient(conn)
  142. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  143. resp, err := kvc.Hash(ctx, &pb.HashRequest{})
  144. hv := resp.Hash
  145. if resp != nil && err != nil {
  146. return err
  147. }
  148. cancel()
  149. hashes[u] = hv
  150. }
  151. if !checkConsistency(hashes) {
  152. return fmt.Errorf("check consistency fails: %v", hashes)
  153. }
  154. return nil
  155. }
  156. // checkConsistency returns true if all nodes have the same KV hash values.
  157. func checkConsistency(hashes map[string]uint32) bool {
  158. var cv uint32
  159. isConsistent := true
  160. for _, v := range hashes {
  161. if cv == 0 {
  162. cv = v
  163. }
  164. if cv != v {
  165. isConsistent = false
  166. }
  167. }
  168. return isConsistent
  169. }