case_network_delay.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "time"
  17. "go.etcd.io/etcd/functional/rpcpb"
  18. "go.uber.org/zap"
  19. )
  20. const (
  21. // Wait more when it recovers from slow network, because network layer
  22. // needs extra time to propagate traffic control (tc command) change.
  23. // Otherwise, we get different hash values from the previous revision.
  24. // For more detail, please see https://github.com/etcd-io/etcd/issues/5121.
  25. waitRecover = 5 * time.Second
  26. )
  27. func inject_DELAY_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
  28. clus.lg.Info(
  29. "injecting delay latency",
  30. zap.Duration("latency", time.Duration(clus.Tester.UpdatedDelayLatencyMs)*time.Millisecond),
  31. zap.Duration("latency-rv", time.Duration(clus.Tester.DelayLatencyMsRv)*time.Millisecond),
  32. zap.String("endpoint", clus.Members[idx].EtcdClientEndpoint),
  33. )
  34. return clus.sendOp(idx, rpcpb.Operation_DELAY_PEER_PORT_TX_RX)
  35. }
  36. func recover_DELAY_PEER_PORT_TX_RX(clus *Cluster, idx int) error {
  37. err := clus.sendOp(idx, rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX)
  38. time.Sleep(waitRecover)
  39. return err
  40. }
  41. func new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER(clus *Cluster, random bool) Case {
  42. cc := caseByFunc{
  43. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER,
  44. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  45. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  46. }
  47. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  48. if random {
  49. clus.UpdateDelayLatencyMs()
  50. cc.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
  51. }
  52. c := &caseFollower{cc, -1, -1}
  53. return &caseDelay{
  54. Case: c,
  55. delayDuration: clus.GetCaseDelayDuration(),
  56. }
  57. }
  58. func new_Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster, random bool) Case {
  59. cc := caseByFunc{
  60. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT,
  61. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  62. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  63. }
  64. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  65. if random {
  66. clus.UpdateDelayLatencyMs()
  67. cc.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT
  68. }
  69. c := &caseFollower{cc, -1, -1}
  70. return &caseUntilSnapshot{
  71. rpcpbCase: cc.rpcpbCase,
  72. Case: c,
  73. }
  74. }
  75. func new_Case_DELAY_PEER_PORT_TX_RX_LEADER(clus *Cluster, random bool) Case {
  76. cc := caseByFunc{
  77. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER,
  78. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  79. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  80. }
  81. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  82. if random {
  83. clus.UpdateDelayLatencyMs()
  84. cc.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER
  85. }
  86. c := &caseLeader{cc, -1, -1}
  87. return &caseDelay{
  88. Case: c,
  89. delayDuration: clus.GetCaseDelayDuration(),
  90. }
  91. }
  92. func new_Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT(clus *Cluster, random bool) Case {
  93. cc := caseByFunc{
  94. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT,
  95. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  96. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  97. }
  98. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  99. if random {
  100. clus.UpdateDelayLatencyMs()
  101. cc.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT
  102. }
  103. c := &caseLeader{cc, -1, -1}
  104. return &caseUntilSnapshot{
  105. rpcpbCase: cc.rpcpbCase,
  106. Case: c,
  107. }
  108. }
  109. func new_Case_DELAY_PEER_PORT_TX_RX_QUORUM(clus *Cluster, random bool) Case {
  110. c := &caseQuorum{
  111. caseByFunc: caseByFunc{
  112. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_QUORUM,
  113. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  114. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  115. },
  116. injected: make(map[int]struct{}),
  117. }
  118. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  119. if random {
  120. clus.UpdateDelayLatencyMs()
  121. c.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM
  122. }
  123. return &caseDelay{
  124. Case: c,
  125. delayDuration: clus.GetCaseDelayDuration(),
  126. }
  127. }
  128. func new_Case_DELAY_PEER_PORT_TX_RX_ALL(clus *Cluster, random bool) Case {
  129. c := &caseAll{
  130. rpcpbCase: rpcpb.Case_DELAY_PEER_PORT_TX_RX_ALL,
  131. injectMember: inject_DELAY_PEER_PORT_TX_RX,
  132. recoverMember: recover_DELAY_PEER_PORT_TX_RX,
  133. }
  134. clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs
  135. if random {
  136. clus.UpdateDelayLatencyMs()
  137. c.rpcpbCase = rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL
  138. }
  139. return &caseDelay{
  140. Case: c,
  141. delayDuration: clus.GetCaseDelayDuration(),
  142. }
  143. }