stresser.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "time"
  20. "golang.org/x/time/rate"
  21. "google.golang.org/grpc/grpclog"
  22. )
  23. func init() { grpclog.SetLogger(plog) }
  24. type Stresser interface {
  25. // Stress starts to stress the etcd cluster
  26. Stress() error
  27. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  28. Pause()
  29. // Close releases all of the Stresser's resources.
  30. Close()
  31. // ModifiedKeys reports the number of keys created and deleted by stresser
  32. ModifiedKeys() int64
  33. // Checker returns an invariant checker for after the stresser is canceled.
  34. Checker() Checker
  35. }
  36. // nopStresser implements Stresser that does nothing
  37. type nopStresser struct {
  38. start time.Time
  39. qps int
  40. }
  41. func (s *nopStresser) Stress() error { return nil }
  42. func (s *nopStresser) Pause() {}
  43. func (s *nopStresser) Close() {}
  44. func (s *nopStresser) ModifiedKeys() int64 {
  45. return 0
  46. }
  47. func (s *nopStresser) Checker() Checker { return nil }
  48. // compositeStresser implements a Stresser that runs a slice of
  49. // stressers concurrently.
  50. type compositeStresser struct {
  51. stressers []Stresser
  52. }
  53. func (cs *compositeStresser) Stress() error {
  54. for i, s := range cs.stressers {
  55. if err := s.Stress(); err != nil {
  56. for j := 0; j < i; j++ {
  57. cs.stressers[i].Close()
  58. }
  59. return err
  60. }
  61. }
  62. return nil
  63. }
  64. func (cs *compositeStresser) Pause() {
  65. var wg sync.WaitGroup
  66. wg.Add(len(cs.stressers))
  67. for i := range cs.stressers {
  68. go func(s Stresser) {
  69. defer wg.Done()
  70. s.Pause()
  71. }(cs.stressers[i])
  72. }
  73. wg.Wait()
  74. }
  75. func (cs *compositeStresser) Close() {
  76. var wg sync.WaitGroup
  77. wg.Add(len(cs.stressers))
  78. for i := range cs.stressers {
  79. go func(s Stresser) {
  80. defer wg.Done()
  81. s.Close()
  82. }(cs.stressers[i])
  83. }
  84. wg.Wait()
  85. }
  86. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  87. for _, stress := range cs.stressers {
  88. modifiedKey += stress.ModifiedKeys()
  89. }
  90. return modifiedKey
  91. }
  92. func (cs *compositeStresser) Checker() Checker {
  93. var chks []Checker
  94. for _, s := range cs.stressers {
  95. if chk := s.Checker(); chk != nil {
  96. chks = append(chks, chk)
  97. }
  98. }
  99. if len(chks) == 0 {
  100. return nil
  101. }
  102. return newCompositeChecker(chks)
  103. }
  104. type stressConfig struct {
  105. keyLargeSize int
  106. keySize int
  107. keySuffixRange int
  108. numLeases int
  109. keysPerLease int
  110. rateLimiter *rate.Limiter
  111. etcdRunnerPath string
  112. }
  113. // NewStresser creates stresser from a comma separated list of stresser types.
  114. func NewStresser(s string, sc *stressConfig, m *member) Stresser {
  115. types := strings.Split(s, ",")
  116. if len(types) > 1 {
  117. stressers := make([]Stresser, len(types))
  118. for i, stype := range types {
  119. stressers[i] = NewStresser(stype, sc, m)
  120. }
  121. return &compositeStresser{stressers}
  122. }
  123. switch s {
  124. case "nop":
  125. return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
  126. case "keys":
  127. // TODO: Too intensive stressers can panic etcd member with
  128. // 'out of memory' error. Put rate limits in server side.
  129. return &keyStresser{
  130. Endpoint: m.grpcAddr(),
  131. keyLargeSize: sc.keyLargeSize,
  132. keySize: sc.keySize,
  133. keySuffixRange: sc.keySuffixRange,
  134. N: 100,
  135. rateLimiter: sc.rateLimiter,
  136. }
  137. case "v2keys":
  138. return &v2Stresser{
  139. Endpoint: m.ClientURL,
  140. keySize: sc.keySize,
  141. keySuffixRange: sc.keySuffixRange,
  142. N: 100,
  143. rateLimiter: sc.rateLimiter,
  144. }
  145. case "lease":
  146. return &leaseStresser{
  147. endpoint: m.grpcAddr(),
  148. numLeases: sc.numLeases,
  149. keysPerLease: sc.keysPerLease,
  150. rateLimiter: sc.rateLimiter,
  151. }
  152. case "election-runner":
  153. reqRate := 100
  154. args := []string{
  155. "election",
  156. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  157. "--dial-timeout=10s",
  158. "--endpoints", m.grpcAddr(),
  159. "--total-client-connections=10",
  160. "--rounds=0", // runs forever
  161. "--req-rate", fmt.Sprintf("%v", reqRate),
  162. }
  163. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  164. case "watch-runner":
  165. reqRate := 100
  166. args := []string{
  167. "watcher",
  168. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  169. "--total-keys=1",
  170. "--total-prefixes=1",
  171. "--watch-per-prefix=1",
  172. "--endpoints", m.grpcAddr(),
  173. "--rounds=0", // runs forever
  174. "--req-rate", fmt.Sprintf("%v", reqRate),
  175. }
  176. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  177. case "lock-racer-runner":
  178. reqRate := 100
  179. args := []string{
  180. "lock-racer",
  181. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  182. "--endpoints", m.grpcAddr(),
  183. "--total-client-connections=10",
  184. "--rounds=0", // runs forever
  185. "--req-rate", fmt.Sprintf("%v", reqRate),
  186. }
  187. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  188. case "lease-runner":
  189. args := []string{
  190. "lease-renewer",
  191. "--ttl=30",
  192. "--endpoints", m.grpcAddr(),
  193. }
  194. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
  195. default:
  196. plog.Panicf("unknown stresser type: %s\n", s)
  197. }
  198. return nil // never reach here
  199. }