stresser.go 5.4 KB


  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. "golang.org/x/time/rate"
  21. )
  22. type Stresser interface {
  23. // Stress starts to stress the etcd cluster
  24. Stress() error
  25. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  26. Pause()
  27. // Close releases all of the Stresser's resources.
  28. Close()
  29. // ModifiedKeys reports the number of keys created and deleted by stresser
  30. ModifiedKeys() int64
  31. // Checker returns an invariant checker for after the stresser is canceled.
  32. Checker() Checker
  33. }
  34. // nopStresser implements Stresser that does nothing
  35. type nopStresser struct {
  36. start time.Time
  37. qps int
  38. }
  39. func (s *nopStresser) Stress() error { return nil }
  40. func (s *nopStresser) Pause() {}
  41. func (s *nopStresser) Close() {}
  42. func (s *nopStresser) ModifiedKeys() int64 {
  43. return 0
  44. }
  45. func (s *nopStresser) Checker() Checker { return nil }
  46. // compositeStresser implements a Stresser that runs a slice of
  47. // stressers concurrently.
  48. type compositeStresser struct {
  49. stressers []Stresser
  50. }
  51. func (cs *compositeStresser) Stress() error {
  52. for i, s := range cs.stressers {
  53. if err := s.Stress(); err != nil {
  54. for j := 0; j < i; j++ {
  55. cs.stressers[i].Close()
  56. }
  57. return err
  58. }
  59. }
  60. return nil
  61. }
  62. func (cs *compositeStresser) Pause() {
  63. var wg sync.WaitGroup
  64. wg.Add(len(cs.stressers))
  65. for i := range cs.stressers {
  66. go func(s Stresser) {
  67. defer wg.Done()
  68. s.Pause()
  69. }(cs.stressers[i])
  70. }
  71. wg.Wait()
  72. }
  73. func (cs *compositeStresser) Close() {
  74. var wg sync.WaitGroup
  75. wg.Add(len(cs.stressers))
  76. for i := range cs.stressers {
  77. go func(s Stresser) {
  78. defer wg.Done()
  79. s.Close()
  80. }(cs.stressers[i])
  81. }
  82. wg.Wait()
  83. }
  84. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  85. for _, stress := range cs.stressers {
  86. modifiedKey += stress.ModifiedKeys()
  87. }
  88. return modifiedKey
  89. }
  90. func (cs *compositeStresser) Checker() Checker {
  91. var chks []Checker
  92. for _, s := range cs.stressers {
  93. if chk := s.Checker(); chk != nil {
  94. chks = append(chks, chk)
  95. }
  96. }
  97. if len(chks) == 0 {
  98. return nil
  99. }
  100. return newCompositeChecker(chks)
  101. }
  102. type stressConfig struct {
  103. keyLargeSize int
  104. keySize int
  105. keySuffixRange int
  106. numLeases int
  107. keysPerLease int
  108. rateLimiter *rate.Limiter
  109. etcdRunnerPath string
  110. }
  111. // NewStresser creates stresser from a comma separated list of stresser types.
  112. func NewStresser(s string, sc *stressConfig, m *member) Stresser {
  113. types := strings.Split(s, ",")
  114. if len(types) > 1 {
  115. stressers := make([]Stresser, len(types))
  116. for i, stype := range types {
  117. stressers[i] = NewStresser(stype, sc, m)
  118. }
  119. return &compositeStresser{stressers}
  120. }
  121. switch s {
  122. case "nop":
  123. return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
  124. case "keys":
  125. // TODO: Too intensive stressers can panic etcd member with
  126. // 'out of memory' error. Put rate limits in server side.
  127. return &keyStresser{
  128. Endpoint: m.grpcAddr(),
  129. keyLargeSize: sc.keyLargeSize,
  130. keySize: sc.keySize,
  131. keySuffixRange: sc.keySuffixRange,
  132. N: 100,
  133. rateLimiter: sc.rateLimiter,
  134. }
  135. case "v2keys":
  136. return &v2Stresser{
  137. Endpoint: m.ClientURL,
  138. keySize: sc.keySize,
  139. keySuffixRange: sc.keySuffixRange,
  140. N: 100,
  141. rateLimiter: sc.rateLimiter,
  142. }
  143. case "lease":
  144. return &leaseStresser{
  145. endpoint: m.grpcAddr(),
  146. numLeases: sc.numLeases,
  147. keysPerLease: sc.keysPerLease,
  148. rateLimiter: sc.rateLimiter,
  149. }
  150. case "election-runner":
  151. reqRate := 100
  152. args := []string{
  153. "election",
  154. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  155. "--dial-timeout=10s",
  156. "--endpoints", m.grpcAddr(),
  157. "--total-client-connections=10",
  158. "--rounds=0", // runs forever
  159. "--req-rate", fmt.Sprintf("%v", reqRate),
  160. }
  161. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  162. case "watch-runner":
  163. reqRate := 100
  164. args := []string{
  165. "watcher",
  166. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  167. "--total-keys=1",
  168. "--total-prefixes=1",
  169. "--watch-per-prefix=1",
  170. "--endpoints", m.grpcAddr(),
  171. "--rounds=0", // runs forever
  172. "--req-rate", fmt.Sprintf("%v", reqRate),
  173. }
  174. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  175. case "lock-racer-runner":
  176. reqRate := 100
  177. args := []string{
  178. "lock-racer",
  179. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  180. "--endpoints", m.grpcAddr(),
  181. "--total-client-connections=10",
  182. "--rounds=0", // runs forever
  183. "--req-rate", fmt.Sprintf("%v", reqRate),
  184. }
  185. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  186. case "lease-runner":
  187. args := []string{
  188. "lease-renewer",
  189. "--ttl=30",
  190. "--endpoints", m.grpcAddr(),
  191. }
  192. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
  193. default:
  194. plog.Panicf("unknown stresser type: %s\n", s)
  195. }
  196. return nil // never reach here
  197. }