failure.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "time"
  19. "github.com/coreos/etcd/tools/functional-tester/rpcpb"
  20. )
  21. // Failure defines failure injection interface.
  22. // To add a fail case:
  23. // 1. implement "Failure" interface
  24. // 2. define fail case name in "rpcpb.FailureCase"
  25. type Failure interface {
  26. // Inject injeccts the failure into the testing cluster at the given
  27. // round. When calling the function, the cluster should be in health.
  28. Inject(clus *Cluster) error
  29. // Recover recovers the injected failure caused by the injection of the
  30. // given round and wait for the recovery of the testing cluster.
  31. Recover(clus *Cluster) error
  32. // Desc returns a description of the failure
  33. Desc() string
  34. // FailureCase returns "rpcpb.FailureCase" enum type.
  35. FailureCase() rpcpb.FailureCase
  36. }
  37. type injectMemberFunc func(*Cluster, int) error
  38. type recoverMemberFunc func(*Cluster, int) error
  39. type failureByFunc struct {
  40. desc
  41. failureCase rpcpb.FailureCase
  42. injectMember injectMemberFunc
  43. recoverMember recoverMemberFunc
  44. }
  45. func (f *failureByFunc) Desc() string {
  46. if string(f.desc) != "" {
  47. return string(f.desc)
  48. }
  49. return f.failureCase.String()
  50. }
  51. func (f *failureByFunc) FailureCase() rpcpb.FailureCase {
  52. return f.failureCase
  53. }
  54. type failureFollower struct {
  55. failureByFunc
  56. last int
  57. lead int
  58. }
  59. func (f *failureFollower) updateIndex(clus *Cluster) error {
  60. idx, err := clus.GetLeader()
  61. if err != nil {
  62. return err
  63. }
  64. f.lead = idx
  65. n := len(clus.Members)
  66. if f.last == -1 { // first run
  67. f.last = clus.rd % n
  68. if f.last == f.lead {
  69. f.last = (f.last + 1) % n
  70. }
  71. } else {
  72. f.last = (f.last + 1) % n
  73. if f.last == f.lead {
  74. f.last = (f.last + 1) % n
  75. }
  76. }
  77. return nil
  78. }
  79. func (f *failureFollower) Inject(clus *Cluster) error {
  80. if err := f.updateIndex(clus); err != nil {
  81. return err
  82. }
  83. return f.injectMember(clus, f.last)
  84. }
  85. func (f *failureFollower) Recover(clus *Cluster) error {
  86. return f.recoverMember(clus, f.last)
  87. }
  88. func (f *failureFollower) FailureCase() rpcpb.FailureCase { return f.failureCase }
  89. type failureLeader struct {
  90. failureByFunc
  91. last int
  92. lead int
  93. }
  94. func (f *failureLeader) updateIndex(clus *Cluster) error {
  95. idx, err := clus.GetLeader()
  96. if err != nil {
  97. return err
  98. }
  99. f.lead = idx
  100. f.last = idx
  101. return nil
  102. }
  103. func (f *failureLeader) Inject(clus *Cluster) error {
  104. if err := f.updateIndex(clus); err != nil {
  105. return err
  106. }
  107. return f.injectMember(clus, f.last)
  108. }
  109. func (f *failureLeader) Recover(clus *Cluster) error {
  110. return f.recoverMember(clus, f.last)
  111. }
  112. func (f *failureLeader) FailureCase() rpcpb.FailureCase {
  113. return f.failureCase
  114. }
  115. type failureQuorum failureByFunc
  116. func (f *failureQuorum) Inject(clus *Cluster) error {
  117. for i := range killMap(len(clus.Members), clus.rd) {
  118. if err := f.injectMember(clus, i); err != nil {
  119. return err
  120. }
  121. }
  122. return nil
  123. }
  124. func (f *failureQuorum) Recover(clus *Cluster) error {
  125. for i := range killMap(len(clus.Members), clus.rd) {
  126. if err := f.recoverMember(clus, i); err != nil {
  127. return err
  128. }
  129. }
  130. return nil
  131. }
  132. func (f *failureQuorum) FailureCase() rpcpb.FailureCase { return f.failureCase }
  133. type failureAll failureByFunc
  134. func (f *failureAll) Inject(clus *Cluster) error {
  135. for i := range clus.Members {
  136. if err := f.injectMember(clus, i); err != nil {
  137. return err
  138. }
  139. }
  140. return nil
  141. }
  142. func (f *failureAll) Recover(clus *Cluster) error {
  143. for i := range clus.Members {
  144. if err := f.recoverMember(clus, i); err != nil {
  145. return err
  146. }
  147. }
  148. return nil
  149. }
  150. func (f *failureAll) FailureCase() rpcpb.FailureCase {
  151. return f.failureCase
  152. }
  153. // failureUntilSnapshot injects a failure and waits for a snapshot event
  154. type failureUntilSnapshot struct {
  155. desc desc
  156. failureCase rpcpb.FailureCase
  157. Failure
  158. }
  159. const snapshotCount = 10000
  160. func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
  161. if err := f.Failure.Inject(clus); err != nil {
  162. return err
  163. }
  164. if len(clus.Members) < 3 {
  165. return nil
  166. }
  167. // maxRev may fail since failure just injected, retry if failed.
  168. startRev, err := clus.maxRev()
  169. for i := 0; i < 10 && startRev == 0; i++ {
  170. startRev, err = clus.maxRev()
  171. }
  172. if startRev == 0 {
  173. return err
  174. }
  175. lastRev := startRev
  176. // Normal healthy cluster could accept 1000req/s at least.
  177. // Give it 3-times time to create a new snapshot.
  178. retry := snapshotCount / 1000 * 3
  179. for j := 0; j < retry; j++ {
  180. lastRev, _ = clus.maxRev()
  181. // If the number of proposals committed is bigger than snapshot count,
  182. // a new snapshot should have been created.
  183. if lastRev-startRev > snapshotCount {
  184. return nil
  185. }
  186. time.Sleep(time.Second)
  187. }
  188. return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
  189. }
  190. func (f *failureUntilSnapshot) Desc() string {
  191. if f.desc.Desc() != "" {
  192. return f.desc.Desc()
  193. }
  194. return f.failureCase.String()
  195. }
  196. func (f *failureUntilSnapshot) FailureCase() rpcpb.FailureCase {
  197. return f.failureCase
  198. }
  199. func killMap(size int, seed int) map[int]bool {
  200. m := make(map[int]bool)
  201. r := rand.New(rand.NewSource(int64(seed)))
  202. majority := size/2 + 1
  203. for {
  204. m[r.Intn(size)] = true
  205. if len(m) >= majority {
  206. return m
  207. }
  208. }
  209. }
  210. type desc string
  211. func (d desc) Desc() string { return string(d) }