stresser.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "time"
  18. "go.etcd.io/etcd/functional/rpcpb"
  19. "go.uber.org/zap"
  20. )
  21. // Stresser defines stressing client operations.
  22. type Stresser interface {
  23. // Stress starts to stress the etcd cluster
  24. Stress() error
  25. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  26. Pause() map[string]int
  27. // Close releases all of the Stresser's resources.
  28. Close() map[string]int
  29. // ModifiedKeys reports the number of keys created and deleted by stresser
  30. ModifiedKeys() int64
  31. }
  32. // newStresser creates stresser from a comma separated list of stresser types.
  33. func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
  34. // TODO: Too intensive stressing clients can panic etcd member with
  35. // 'out of memory' error. Put rate limits in server side.
  36. ks := &keyStresser{
  37. lg: clus.lg,
  38. m: m,
  39. keySize: int(clus.Tester.StressKeySize),
  40. keyLargeSize: int(clus.Tester.StressKeySizeLarge),
  41. keySuffixRange: int(clus.Tester.StressKeySuffixRange),
  42. keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
  43. keyTxnOps: int(clus.Tester.StressKeyTxnOps),
  44. clientsN: int(clus.Tester.StressClients),
  45. rateLimiter: clus.rateLimiter,
  46. }
  47. ksExist := false
  48. for _, s := range clus.Tester.Stressers {
  49. clus.lg.Info(
  50. "creating stresser",
  51. zap.String("type", s.Type),
  52. zap.Float64("weight", s.Weight),
  53. zap.String("endpoint", m.EtcdClientEndpoint),
  54. )
  55. switch s.Type {
  56. case "KV_WRITE_SMALL":
  57. ksExist = true
  58. ks.weightKVWriteSmall = s.Weight
  59. case "KV_WRITE_LARGE":
  60. ksExist = true
  61. ks.weightKVWriteLarge = s.Weight
  62. case "KV_READ_ONE_KEY":
  63. ksExist = true
  64. ks.weightKVReadOneKey = s.Weight
  65. case "KV_READ_RANGE":
  66. ksExist = true
  67. ks.weightKVReadRange = s.Weight
  68. case "KV_DELETE_ONE_KEY":
  69. ksExist = true
  70. ks.weightKVDeleteOneKey = s.Weight
  71. case "KV_DELETE_RANGE":
  72. ksExist = true
  73. ks.weightKVDeleteRange = s.Weight
  74. case "KV_TXN_WRITE_DELETE":
  75. ksExist = true
  76. ks.weightKVTxnWriteDelete = s.Weight
  77. case "LEASE":
  78. stressers = append(stressers, &leaseStresser{
  79. stype: rpcpb.StresserType_LEASE,
  80. lg: clus.lg,
  81. m: m,
  82. numLeases: 10, // TODO: configurable
  83. keysPerLease: 10, // TODO: configurable
  84. rateLimiter: clus.rateLimiter,
  85. })
  86. case "ELECTION_RUNNER":
  87. reqRate := 100
  88. args := []string{
  89. "election",
  90. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  91. "--dial-timeout=10s",
  92. "--endpoints", m.EtcdClientEndpoint,
  93. "--total-client-connections=10",
  94. "--rounds=0", // runs forever
  95. "--req-rate", fmt.Sprintf("%v", reqRate),
  96. }
  97. stressers = append(stressers, newRunnerStresser(
  98. rpcpb.StresserType_ELECTION_RUNNER,
  99. m.EtcdClientEndpoint,
  100. clus.lg,
  101. clus.Tester.RunnerExecPath,
  102. args,
  103. clus.rateLimiter,
  104. reqRate,
  105. ))
  106. case "WATCH_RUNNER":
  107. reqRate := 100
  108. args := []string{
  109. "watcher",
  110. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  111. "--total-keys=1",
  112. "--total-prefixes=1",
  113. "--watch-per-prefix=1",
  114. "--endpoints", m.EtcdClientEndpoint,
  115. "--rounds=0", // runs forever
  116. "--req-rate", fmt.Sprintf("%v", reqRate),
  117. }
  118. stressers = append(stressers, newRunnerStresser(
  119. rpcpb.StresserType_WATCH_RUNNER,
  120. m.EtcdClientEndpoint,
  121. clus.lg,
  122. clus.Tester.RunnerExecPath,
  123. args,
  124. clus.rateLimiter,
  125. reqRate,
  126. ))
  127. case "LOCK_RACER_RUNNER":
  128. reqRate := 100
  129. args := []string{
  130. "lock-racer",
  131. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  132. "--endpoints", m.EtcdClientEndpoint,
  133. "--total-client-connections=10",
  134. "--rounds=0", // runs forever
  135. "--req-rate", fmt.Sprintf("%v", reqRate),
  136. }
  137. stressers = append(stressers, newRunnerStresser(
  138. rpcpb.StresserType_LOCK_RACER_RUNNER,
  139. m.EtcdClientEndpoint,
  140. clus.lg,
  141. clus.Tester.RunnerExecPath,
  142. args,
  143. clus.rateLimiter,
  144. reqRate,
  145. ))
  146. case "LEASE_RUNNER":
  147. args := []string{
  148. "lease-renewer",
  149. "--ttl=30",
  150. "--endpoints", m.EtcdClientEndpoint,
  151. }
  152. stressers = append(stressers, newRunnerStresser(
  153. rpcpb.StresserType_LEASE_RUNNER,
  154. m.EtcdClientEndpoint,
  155. clus.lg,
  156. clus.Tester.RunnerExecPath,
  157. args,
  158. clus.rateLimiter,
  159. 0,
  160. ))
  161. }
  162. }
  163. if ksExist {
  164. return append(stressers, ks)
  165. }
  166. return stressers
  167. }