stresser.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "strings"
  17. "sync"
  18. "time"
  19. "golang.org/x/time/rate"
  20. "google.golang.org/grpc/grpclog"
  21. )
  22. func init() { grpclog.SetLogger(plog) }
  23. type Stresser interface {
  24. // Stress starts to stress the etcd cluster
  25. Stress() error
  26. // Cancel cancels the stress test on the etcd cluster
  27. Cancel()
  28. // ModifiedKeys reports the number of keys created and deleted by stresser
  29. ModifiedKeys() int64
  30. // Checker returns an invariant checker for after the stresser is canceled.
  31. Checker() Checker
  32. }
  33. // nopStresser implements Stresser that does nothing
  34. type nopStresser struct {
  35. start time.Time
  36. qps int
  37. }
  38. func (s *nopStresser) Stress() error { return nil }
  39. func (s *nopStresser) Cancel() {}
  40. func (s *nopStresser) ModifiedKeys() int64 {
  41. return 0
  42. }
  43. func (s *nopStresser) Checker() Checker { return nil }
  44. // compositeStresser implements a Stresser that runs a slice of
  45. // stressers concurrently.
  46. type compositeStresser struct {
  47. stressers []Stresser
  48. }
  49. func (cs *compositeStresser) Stress() error {
  50. for i, s := range cs.stressers {
  51. if err := s.Stress(); err != nil {
  52. for j := 0; j < i; j++ {
  53. cs.stressers[i].Cancel()
  54. }
  55. return err
  56. }
  57. }
  58. return nil
  59. }
  60. func (cs *compositeStresser) Cancel() {
  61. var wg sync.WaitGroup
  62. wg.Add(len(cs.stressers))
  63. for i := range cs.stressers {
  64. go func(s Stresser) {
  65. defer wg.Done()
  66. s.Cancel()
  67. }(cs.stressers[i])
  68. }
  69. wg.Wait()
  70. }
  71. func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
  72. for _, stress := range cs.stressers {
  73. modifiedKey += stress.ModifiedKeys()
  74. }
  75. return modifiedKey
  76. }
  77. func (cs *compositeStresser) Checker() Checker {
  78. var chks []Checker
  79. for _, s := range cs.stressers {
  80. if chk := s.Checker(); chk != nil {
  81. chks = append(chks, chk)
  82. }
  83. }
  84. if len(chks) == 0 {
  85. return nil
  86. }
  87. return newCompositeChecker(chks)
  88. }
  89. type stressConfig struct {
  90. keyLargeSize int
  91. keySize int
  92. keySuffixRange int
  93. numLeases int
  94. keysPerLease int
  95. rateLimiter *rate.Limiter
  96. }
  97. // NewStresser creates stresser from a comma separated list of stresser types.
  98. func NewStresser(s string, sc *stressConfig, m *member) Stresser {
  99. types := strings.Split(s, ",")
  100. if len(types) > 1 {
  101. stressers := make([]Stresser, len(types))
  102. for i, stype := range types {
  103. stressers[i] = NewStresser(stype, sc, m)
  104. }
  105. return &compositeStresser{stressers}
  106. }
  107. switch s {
  108. case "nop":
  109. return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
  110. case "keys":
  111. // TODO: Too intensive stressers can panic etcd member with
  112. // 'out of memory' error. Put rate limits in server side.
  113. return &keyStresser{
  114. Endpoint: m.grpcAddr(),
  115. keyLargeSize: sc.keyLargeSize,
  116. keySize: sc.keySize,
  117. keySuffixRange: sc.keySuffixRange,
  118. N: 100,
  119. rateLimiter: sc.rateLimiter,
  120. }
  121. case "v2keys":
  122. return &v2Stresser{
  123. Endpoint: m.ClientURL,
  124. keySize: sc.keySize,
  125. keySuffixRange: sc.keySuffixRange,
  126. N: 100,
  127. rateLimiter: sc.rateLimiter,
  128. }
  129. case "lease":
  130. return &leaseStresser{
  131. endpoint: m.grpcAddr(),
  132. numLeases: sc.numLeases,
  133. keysPerLease: sc.keysPerLease,
  134. rateLimiter: sc.rateLimiter,
  135. }
  136. default:
  137. plog.Panicf("unknown stresser type: %s\n", s)
  138. }
  139. return nil // never reach here
  140. }