stresser.go 5.6 KB


  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. "golang.org/x/time/rate"
  21. )
  22. type Stresser interface {
  23. // Stress starts to stress the etcd cluster
  24. Stress() error
  25. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  26. Pause()
  27. // Close releases all of the Stresser's resources.
  28. Close()
  29. // ModifiedKeys reports the number of keys created and deleted by stresser
  30. ModifiedKeys() int64
  31. // Checker returns an invariant checker for after the stresser is canceled.
  32. Checker() Checker
  33. }
  34. // nopStresser implements Stresser that does nothing
  35. type nopStresser struct {
  36. start time.Time
  37. qps int
  38. }
  39. func (s *nopStresser) Stress() error { return nil }
  40. func (s *nopStresser) Pause() {}
  41. func (s *nopStresser) Close() {}
  42. func (s *nopStresser) ModifiedKeys() int64 {
  43. return 0
  44. }
  45. func (s *nopStresser) Checker() Checker { return nil }
  46. // compositeStresser implements a Stresser that runs a slice of
  47. // stressers concurrently.
  48. type compositeStresser struct {
  49. stressers []Stresser
  50. }
  51. func (cs *compositeStresser) Stress() error {
  52. for i, s := range cs.stressers {
  53. if err := s.Stress(); err != nil {
  54. for j := 0; j < i; j++ {
  55. cs.stressers[i].Close()
  56. }
  57. return err
  58. }
  59. }
  60. return nil
  61. }
  62. func (cs *compositeStresser) Pause() {
  63. var wg sync.WaitGroup
  64. wg.Add(len(cs.stressers))
  65. for i := range cs.stressers {
  66. go func(s Stresser) {
  67. defer wg.Done()
  68. s.Pause()
  69. }(cs.stressers[i])
  70. }
  71. wg.Wait()
  72. }
  73. func (cs *compositeStresser) Close() {
  74. var wg sync.WaitGroup
  75. wg.Add(len(cs.stressers))
  76. for i := range cs.stressers {
  77. go func(s Stresser) {
  78. defer wg.Done()
  79. s.Close()
  80. }(cs.stressers[i])
  81. }
  82. wg.Wait()
  83. }
  84. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  85. for _, stress := range cs.stressers {
  86. modifiedKey += stress.ModifiedKeys()
  87. }
  88. return modifiedKey
  89. }
  90. func (cs *compositeStresser) Checker() Checker {
  91. var chks []Checker
  92. for _, s := range cs.stressers {
  93. if chk := s.Checker(); chk != nil {
  94. chks = append(chks, chk)
  95. }
  96. }
  97. if len(chks) == 0 {
  98. return nil
  99. }
  100. return newCompositeChecker(chks)
  101. }
  102. type stressConfig struct {
  103. keyLargeSize int
  104. keySize int
  105. keySuffixRange int
  106. keyTxnSuffixRange int
  107. keyTxnOps int
  108. numLeases int
  109. keysPerLease int
  110. rateLimiter *rate.Limiter
  111. etcdRunnerPath string
  112. }
  113. // NewStresser creates stresser from a comma separated list of stresser types.
  114. func NewStresser(s string, sc *stressConfig, m *member) Stresser {
  115. types := strings.Split(s, ",")
  116. if len(types) > 1 {
  117. stressers := make([]Stresser, len(types))
  118. for i, stype := range types {
  119. stressers[i] = NewStresser(stype, sc, m)
  120. }
  121. return &compositeStresser{stressers}
  122. }
  123. switch s {
  124. case "nop":
  125. return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
  126. case "keys":
  127. // TODO: Too intensive stressers can panic etcd member with
  128. // 'out of memory' error. Put rate limits in server side.
  129. return &keyStresser{
  130. Endpoint: m.grpcAddr(),
  131. keyLargeSize: sc.keyLargeSize,
  132. keySize: sc.keySize,
  133. keySuffixRange: sc.keySuffixRange,
  134. keyTxnSuffixRange: sc.keyTxnSuffixRange,
  135. keyTxnOps: sc.keyTxnOps,
  136. N: 100,
  137. rateLimiter: sc.rateLimiter,
  138. }
  139. case "v2keys":
  140. return &v2Stresser{
  141. Endpoint: m.ClientURL,
  142. keySize: sc.keySize,
  143. keySuffixRange: sc.keySuffixRange,
  144. N: 100,
  145. rateLimiter: sc.rateLimiter,
  146. }
  147. case "lease":
  148. return &leaseStresser{
  149. endpoint: m.grpcAddr(),
  150. numLeases: sc.numLeases,
  151. keysPerLease: sc.keysPerLease,
  152. rateLimiter: sc.rateLimiter,
  153. }
  154. case "election-runner":
  155. reqRate := 100
  156. args := []string{
  157. "election",
  158. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  159. "--dial-timeout=10s",
  160. "--endpoints", m.grpcAddr(),
  161. "--total-client-connections=10",
  162. "--rounds=0", // runs forever
  163. "--req-rate", fmt.Sprintf("%v", reqRate),
  164. }
  165. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  166. case "watch-runner":
  167. reqRate := 100
  168. args := []string{
  169. "watcher",
  170. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  171. "--total-keys=1",
  172. "--total-prefixes=1",
  173. "--watch-per-prefix=1",
  174. "--endpoints", m.grpcAddr(),
  175. "--rounds=0", // runs forever
  176. "--req-rate", fmt.Sprintf("%v", reqRate),
  177. }
  178. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  179. case "lock-racer-runner":
  180. reqRate := 100
  181. args := []string{
  182. "lock-racer",
  183. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  184. "--endpoints", m.grpcAddr(),
  185. "--total-client-connections=10",
  186. "--rounds=0", // runs forever
  187. "--req-rate", fmt.Sprintf("%v", reqRate),
  188. }
  189. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  190. case "lease-runner":
  191. args := []string{
  192. "lease-renewer",
  193. "--ttl=30",
  194. "--endpoints", m.grpcAddr(),
  195. }
  196. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
  197. default:
  198. plog.Panicf("unknown stresser type: %s\n", s)
  199. }
  200. return nil // never reach here
  201. }