stress.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. "go.uber.org/zap"
  20. )
  21. type Stresser interface {
  22. // Stress starts to stress the etcd cluster
  23. Stress() error
  24. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  25. Pause()
  26. // Close releases all of the Stresser's resources.
  27. Close()
  28. // ModifiedKeys reports the number of keys created and deleted by stresser
  29. ModifiedKeys() int64
  30. // Checker returns an invariant checker for after the stresser is canceled.
  31. Checker() Checker
  32. }
  33. // nopStresser implements Stresser that does nothing
  34. type nopStresser struct {
  35. start time.Time
  36. qps int
  37. }
  38. func (s *nopStresser) Stress() error { return nil }
  39. func (s *nopStresser) Pause() {}
  40. func (s *nopStresser) Close() {}
  41. func (s *nopStresser) ModifiedKeys() int64 {
  42. return 0
  43. }
  44. func (s *nopStresser) Checker() Checker { return nil }
  45. // compositeStresser implements a Stresser that runs a slice of
  46. // stressing clients concurrently.
  47. type compositeStresser struct {
  48. stressers []Stresser
  49. }
  50. func (cs *compositeStresser) Stress() error {
  51. for i, s := range cs.stressers {
  52. if err := s.Stress(); err != nil {
  53. for j := 0; j < i; j++ {
  54. cs.stressers[i].Close()
  55. }
  56. return err
  57. }
  58. }
  59. return nil
  60. }
  61. func (cs *compositeStresser) Pause() {
  62. var wg sync.WaitGroup
  63. wg.Add(len(cs.stressers))
  64. for i := range cs.stressers {
  65. go func(s Stresser) {
  66. defer wg.Done()
  67. s.Pause()
  68. }(cs.stressers[i])
  69. }
  70. wg.Wait()
  71. }
  72. func (cs *compositeStresser) Close() {
  73. var wg sync.WaitGroup
  74. wg.Add(len(cs.stressers))
  75. for i := range cs.stressers {
  76. go func(s Stresser) {
  77. defer wg.Done()
  78. s.Close()
  79. }(cs.stressers[i])
  80. }
  81. wg.Wait()
  82. }
  83. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  84. for _, stress := range cs.stressers {
  85. modifiedKey += stress.ModifiedKeys()
  86. }
  87. return modifiedKey
  88. }
  89. func (cs *compositeStresser) Checker() Checker {
  90. var chks []Checker
  91. for _, s := range cs.stressers {
  92. if chk := s.Checker(); chk != nil {
  93. chks = append(chks, chk)
  94. }
  95. }
  96. if len(chks) == 0 {
  97. return nil
  98. }
  99. return newCompositeChecker(chks)
  100. }
  101. // newStresser creates stresser from a comma separated list of stresser types.
  102. func newStresser(clus *Cluster, idx int) Stresser {
  103. stressers := make([]Stresser, len(clus.Tester.StressTypes))
  104. for i, stype := range clus.Tester.StressTypes {
  105. clus.logger.Info("creating stresser", zap.String("type", stype))
  106. switch stype {
  107. case "NO_STRESS":
  108. stressers[i] = &nopStresser{start: time.Now(), qps: int(clus.rateLimiter.Limit())}
  109. case "KV":
  110. // TODO: Too intensive stressing clients can panic etcd member with
  111. // 'out of memory' error. Put rate limits in server side.
  112. stressers[i] = &keyStresser{
  113. logger: clus.logger,
  114. Endpoint: clus.Members[idx].EtcdClientEndpoint,
  115. keySize: int(clus.Tester.StressKeySize),
  116. keyLargeSize: int(clus.Tester.StressKeySizeLarge),
  117. keySuffixRange: int(clus.Tester.StressKeySuffixRange),
  118. keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
  119. keyTxnOps: int(clus.Tester.StressKeyTxnOps),
  120. N: 100,
  121. rateLimiter: clus.rateLimiter,
  122. }
  123. case "LEASE":
  124. stressers[i] = &leaseStresser{
  125. logger: clus.logger,
  126. endpoint: clus.Members[idx].EtcdClientEndpoint,
  127. numLeases: 10, // TODO: configurable
  128. keysPerLease: 10, // TODO: configurable
  129. rateLimiter: clus.rateLimiter,
  130. }
  131. case "ELECTION_RUNNER":
  132. reqRate := 100
  133. args := []string{
  134. "election",
  135. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  136. "--dial-timeout=10s",
  137. "--endpoints", clus.Members[idx].EtcdClientEndpoint,
  138. "--total-client-connections=10",
  139. "--rounds=0", // runs forever
  140. "--req-rate", fmt.Sprintf("%v", reqRate),
  141. }
  142. stressers[i] = newRunnerStresser(
  143. clus.Tester.RunnerExecPath,
  144. args,
  145. clus.rateLimiter,
  146. reqRate,
  147. )
  148. case "WATCH_RUNNER":
  149. reqRate := 100
  150. args := []string{
  151. "watcher",
  152. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  153. "--total-keys=1",
  154. "--total-prefixes=1",
  155. "--watch-per-prefix=1",
  156. "--endpoints", clus.Members[idx].EtcdClientEndpoint,
  157. "--rounds=0", // runs forever
  158. "--req-rate", fmt.Sprintf("%v", reqRate),
  159. }
  160. stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate)
  161. case "LOCK_RACER_RUNNER":
  162. reqRate := 100
  163. args := []string{
  164. "lock-racer",
  165. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  166. "--endpoints", clus.Members[idx].EtcdClientEndpoint,
  167. "--total-client-connections=10",
  168. "--rounds=0", // runs forever
  169. "--req-rate", fmt.Sprintf("%v", reqRate),
  170. }
  171. stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate)
  172. case "LEASE_RUNNER":
  173. args := []string{
  174. "lease-renewer",
  175. "--ttl=30",
  176. "--endpoints", clus.Members[idx].EtcdClientEndpoint,
  177. }
  178. stressers[i] = newRunnerStresser(clus.Tester.RunnerExecPath, args, clus.rateLimiter, 0)
  179. }
  180. }
  181. return &compositeStresser{stressers}
  182. }