failure.go 7.7 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "time"
  19. "github.com/coreos/etcd/functional/rpcpb"
  20. "go.uber.org/zap"
  21. )
  22. // Failure defines failure injection interface.
  23. // To add a fail case:
  24. // 1. implement "Failure" interface
  25. // 2. define fail case name in "rpcpb.FailureCase"
  26. type Failure interface {
  27. // Inject injeccts the failure into the testing cluster at the given
  28. // round. When calling the function, the cluster should be in health.
  29. Inject(clus *Cluster) error
  30. // Recover recovers the injected failure caused by the injection of the
  31. // given round and wait for the recovery of the testing cluster.
  32. Recover(clus *Cluster) error
  33. // Desc returns a description of the failure
  34. Desc() string
  35. // FailureCase returns "rpcpb.FailureCase" enum type.
  36. FailureCase() rpcpb.FailureCase
  37. }
  38. type injectMemberFunc func(*Cluster, int) error
  39. type recoverMemberFunc func(*Cluster, int) error
  40. type failureByFunc struct {
  41. desc string
  42. failureCase rpcpb.FailureCase
  43. injectMember injectMemberFunc
  44. recoverMember recoverMemberFunc
  45. }
  46. func (f *failureByFunc) Desc() string {
  47. if f.desc != "" {
  48. return f.desc
  49. }
  50. return f.failureCase.String()
  51. }
  52. func (f *failureByFunc) FailureCase() rpcpb.FailureCase {
  53. return f.failureCase
  54. }
  55. type failureFollower struct {
  56. failureByFunc
  57. last int
  58. lead int
  59. }
  60. func (f *failureFollower) updateIndex(clus *Cluster) error {
  61. idx, err := clus.GetLeader()
  62. if err != nil {
  63. return err
  64. }
  65. f.lead = idx
  66. n := len(clus.Members)
  67. if f.last == -1 { // first run
  68. f.last = clus.rd % n
  69. if f.last == f.lead {
  70. f.last = (f.last + 1) % n
  71. }
  72. } else {
  73. f.last = (f.last + 1) % n
  74. if f.last == f.lead {
  75. f.last = (f.last + 1) % n
  76. }
  77. }
  78. return nil
  79. }
  80. func (f *failureFollower) Inject(clus *Cluster) error {
  81. if err := f.updateIndex(clus); err != nil {
  82. return err
  83. }
  84. return f.injectMember(clus, f.last)
  85. }
  86. func (f *failureFollower) Recover(clus *Cluster) error {
  87. return f.recoverMember(clus, f.last)
  88. }
  89. func (f *failureFollower) Desc() string {
  90. if f.desc != "" {
  91. return f.desc
  92. }
  93. return f.failureCase.String()
  94. }
  95. func (f *failureFollower) FailureCase() rpcpb.FailureCase {
  96. return f.failureCase
  97. }
  98. type failureLeader struct {
  99. failureByFunc
  100. last int
  101. lead int
  102. }
  103. func (f *failureLeader) updateIndex(clus *Cluster) error {
  104. idx, err := clus.GetLeader()
  105. if err != nil {
  106. return err
  107. }
  108. f.lead = idx
  109. f.last = idx
  110. return nil
  111. }
  112. func (f *failureLeader) Inject(clus *Cluster) error {
  113. if err := f.updateIndex(clus); err != nil {
  114. return err
  115. }
  116. return f.injectMember(clus, f.last)
  117. }
  118. func (f *failureLeader) Recover(clus *Cluster) error {
  119. return f.recoverMember(clus, f.last)
  120. }
  121. func (f *failureLeader) FailureCase() rpcpb.FailureCase {
  122. return f.failureCase
  123. }
  124. type failureQuorum failureByFunc
  125. func (f *failureQuorum) Inject(clus *Cluster) error {
  126. for i := range killMap(len(clus.Members), clus.rd) {
  127. if err := f.injectMember(clus, i); err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. }
  133. func (f *failureQuorum) Recover(clus *Cluster) error {
  134. for i := range killMap(len(clus.Members), clus.rd) {
  135. if err := f.recoverMember(clus, i); err != nil {
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. func (f *failureQuorum) Desc() string {
  142. if f.desc != "" {
  143. return f.desc
  144. }
  145. return f.failureCase.String()
  146. }
  147. func (f *failureQuorum) FailureCase() rpcpb.FailureCase {
  148. return f.failureCase
  149. }
  150. func killMap(size int, seed int) map[int]bool {
  151. m := make(map[int]bool)
  152. r := rand.New(rand.NewSource(int64(seed)))
  153. majority := size/2 + 1
  154. for {
  155. m[r.Intn(size)] = true
  156. if len(m) >= majority {
  157. return m
  158. }
  159. }
  160. }
  161. type failureAll failureByFunc
  162. func (f *failureAll) Inject(clus *Cluster) error {
  163. for i := range clus.Members {
  164. if err := f.injectMember(clus, i); err != nil {
  165. return err
  166. }
  167. }
  168. return nil
  169. }
  170. func (f *failureAll) Recover(clus *Cluster) error {
  171. for i := range clus.Members {
  172. if err := f.recoverMember(clus, i); err != nil {
  173. return err
  174. }
  175. }
  176. return nil
  177. }
  178. func (f *failureAll) Desc() string {
  179. if f.desc != "" {
  180. return f.desc
  181. }
  182. return f.failureCase.String()
  183. }
  184. func (f *failureAll) FailureCase() rpcpb.FailureCase {
  185. return f.failureCase
  186. }
  187. // failureUntilSnapshot injects a failure and waits for a snapshot event
  188. type failureUntilSnapshot struct {
  189. desc string
  190. failureCase rpcpb.FailureCase
  191. Failure
  192. }
  193. // all delay failure cases except the ones failing with latency
  194. // greater than election timeout (trigger leader election and
  195. // cluster keeps operating anyways)
  196. var slowCases = map[rpcpb.FailureCase]bool{
  197. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true,
  198. rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
  199. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
  200. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true,
  201. rpcpb.FailureCase_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
  202. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
  203. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true,
  204. rpcpb.FailureCase_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true,
  205. }
  206. func (f *failureUntilSnapshot) Inject(clus *Cluster) error {
  207. if err := f.Failure.Inject(clus); err != nil {
  208. return err
  209. }
  210. if len(clus.Members) < 3 {
  211. return nil
  212. }
  213. snapshotCount := clus.Members[0].Etcd.SnapshotCount
  214. now := time.Now()
  215. clus.lg.Info(
  216. "trigger snapshot START",
  217. zap.String("desc", f.Desc()),
  218. zap.Int64("etcd-snapshot-count", snapshotCount),
  219. )
  220. // maxRev may fail since failure just injected, retry if failed.
  221. startRev, err := clus.maxRev()
  222. for i := 0; i < 10 && startRev == 0; i++ {
  223. startRev, err = clus.maxRev()
  224. }
  225. if startRev == 0 {
  226. return err
  227. }
  228. lastRev := startRev
  229. // healthy cluster could accept 1000 req/sec at least.
  230. // 3x time to trigger snapshot.
  231. retries := int(snapshotCount) / 1000 * 3
  232. if v, ok := slowCases[f.FailureCase()]; v && ok {
  233. // slow network takes more retries
  234. retries *= 5
  235. }
  236. for i := 0; i < retries; i++ {
  237. lastRev, _ = clus.maxRev()
  238. // If the number of proposals committed is bigger than snapshot count,
  239. // a new snapshot should have been created.
  240. diff := lastRev - startRev
  241. if diff > snapshotCount {
  242. clus.lg.Info(
  243. "trigger snapshot PASS",
  244. zap.Int("retries", i),
  245. zap.String("desc", f.Desc()),
  246. zap.Int64("committed-entries", diff),
  247. zap.Int64("etcd-snapshot-count", snapshotCount),
  248. zap.Int64("last-revision", lastRev),
  249. zap.Duration("took", time.Since(now)),
  250. )
  251. return nil
  252. }
  253. clus.lg.Info(
  254. "trigger snapshot PROGRESS",
  255. zap.Int("retries", i),
  256. zap.Int64("committed-entries", diff),
  257. zap.Int64("etcd-snapshot-count", snapshotCount),
  258. zap.Int64("last-revision", lastRev),
  259. zap.Duration("took", time.Since(now)),
  260. )
  261. time.Sleep(time.Second)
  262. }
  263. return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries)
  264. }
  265. func (f *failureUntilSnapshot) Desc() string {
  266. if f.desc != "" {
  267. return f.desc
  268. }
  269. if f.failureCase.String() != "" {
  270. return f.failureCase.String()
  271. }
  272. return f.Failure.Desc()
  273. }
  274. func (f *failureUntilSnapshot) FailureCase() rpcpb.FailureCase {
  275. return f.failureCase
  276. }