stresser.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "os"
  18. "strings"
  19. "sync"
  20. "time"
  21. "golang.org/x/time/rate"
  22. "google.golang.org/grpc/grpclog"
  23. )
  24. func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) }
  25. type Stresser interface {
  26. // Stress starts to stress the etcd cluster
  27. Stress() error
  28. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  29. Pause()
  30. // Close releases all of the Stresser's resources.
  31. Close()
  32. // ModifiedKeys reports the number of keys created and deleted by stresser
  33. ModifiedKeys() int64
  34. // Checker returns an invariant checker for after the stresser is canceled.
  35. Checker() Checker
  36. }
  37. // nopStresser implements Stresser that does nothing
  38. type nopStresser struct {
  39. start time.Time
  40. qps int
  41. }
  42. func (s *nopStresser) Stress() error { return nil }
  43. func (s *nopStresser) Pause() {}
  44. func (s *nopStresser) Close() {}
  45. func (s *nopStresser) ModifiedKeys() int64 {
  46. return 0
  47. }
  48. func (s *nopStresser) Checker() Checker { return nil }
  49. // compositeStresser implements a Stresser that runs a slice of
  50. // stressers concurrently.
  51. type compositeStresser struct {
  52. stressers []Stresser
  53. }
  54. func (cs *compositeStresser) Stress() error {
  55. for i, s := range cs.stressers {
  56. if err := s.Stress(); err != nil {
  57. for j := 0; j < i; j++ {
  58. cs.stressers[i].Close()
  59. }
  60. return err
  61. }
  62. }
  63. return nil
  64. }
  65. func (cs *compositeStresser) Pause() {
  66. var wg sync.WaitGroup
  67. wg.Add(len(cs.stressers))
  68. for i := range cs.stressers {
  69. go func(s Stresser) {
  70. defer wg.Done()
  71. s.Pause()
  72. }(cs.stressers[i])
  73. }
  74. wg.Wait()
  75. }
  76. func (cs *compositeStresser) Close() {
  77. var wg sync.WaitGroup
  78. wg.Add(len(cs.stressers))
  79. for i := range cs.stressers {
  80. go func(s Stresser) {
  81. defer wg.Done()
  82. s.Close()
  83. }(cs.stressers[i])
  84. }
  85. wg.Wait()
  86. }
  87. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  88. for _, stress := range cs.stressers {
  89. modifiedKey += stress.ModifiedKeys()
  90. }
  91. return modifiedKey
  92. }
  93. func (cs *compositeStresser) Checker() Checker {
  94. var chks []Checker
  95. for _, s := range cs.stressers {
  96. if chk := s.Checker(); chk != nil {
  97. chks = append(chks, chk)
  98. }
  99. }
  100. if len(chks) == 0 {
  101. return nil
  102. }
  103. return newCompositeChecker(chks)
  104. }
  105. type stressConfig struct {
  106. keyLargeSize int
  107. keySize int
  108. keySuffixRange int
  109. numLeases int
  110. keysPerLease int
  111. rateLimiter *rate.Limiter
  112. etcdRunnerPath string
  113. }
  114. // NewStresser creates stresser from a comma separated list of stresser types.
  115. func NewStresser(s string, sc *stressConfig, m *member) Stresser {
  116. types := strings.Split(s, ",")
  117. if len(types) > 1 {
  118. stressers := make([]Stresser, len(types))
  119. for i, stype := range types {
  120. stressers[i] = NewStresser(stype, sc, m)
  121. }
  122. return &compositeStresser{stressers}
  123. }
  124. switch s {
  125. case "nop":
  126. return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
  127. case "keys":
  128. // TODO: Too intensive stressers can panic etcd member with
  129. // 'out of memory' error. Put rate limits in server side.
  130. return &keyStresser{
  131. Endpoint: m.grpcAddr(),
  132. keyLargeSize: sc.keyLargeSize,
  133. keySize: sc.keySize,
  134. keySuffixRange: sc.keySuffixRange,
  135. N: 100,
  136. rateLimiter: sc.rateLimiter,
  137. }
  138. case "v2keys":
  139. return &v2Stresser{
  140. Endpoint: m.ClientURL,
  141. keySize: sc.keySize,
  142. keySuffixRange: sc.keySuffixRange,
  143. N: 100,
  144. rateLimiter: sc.rateLimiter,
  145. }
  146. case "lease":
  147. return &leaseStresser{
  148. endpoint: m.grpcAddr(),
  149. numLeases: sc.numLeases,
  150. keysPerLease: sc.keysPerLease,
  151. rateLimiter: sc.rateLimiter,
  152. }
  153. case "election-runner":
  154. reqRate := 100
  155. args := []string{
  156. "election",
  157. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  158. "--dial-timeout=10s",
  159. "--endpoints", m.grpcAddr(),
  160. "--total-client-connections=10",
  161. "--rounds=0", // runs forever
  162. "--req-rate", fmt.Sprintf("%v", reqRate),
  163. }
  164. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  165. case "watch-runner":
  166. reqRate := 100
  167. args := []string{
  168. "watcher",
  169. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  170. "--total-keys=1",
  171. "--total-prefixes=1",
  172. "--watch-per-prefix=1",
  173. "--endpoints", m.grpcAddr(),
  174. "--rounds=0", // runs forever
  175. "--req-rate", fmt.Sprintf("%v", reqRate),
  176. }
  177. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  178. case "lock-racer-runner":
  179. reqRate := 100
  180. args := []string{
  181. "lock-racer",
  182. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  183. "--endpoints", m.grpcAddr(),
  184. "--total-client-connections=10",
  185. "--rounds=0", // runs forever
  186. "--req-rate", fmt.Sprintf("%v", reqRate),
  187. }
  188. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, reqRate)
  189. case "lease-runner":
  190. args := []string{
  191. "lease-renewer",
  192. "--ttl=30",
  193. "--endpoints", m.grpcAddr(),
  194. }
  195. return newRunnerStresser(sc.etcdRunnerPath, args, sc.rateLimiter, 0)
  196. default:
  197. plog.Panicf("unknown stresser type: %s\n", s)
  198. }
  199. return nil // never reach here
  200. }