stresser_key.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "reflect"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/coreos/etcd/etcdserver"
  25. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  26. "github.com/coreos/etcd/functional/rpcpb"
  27. "github.com/coreos/etcd/raft"
  28. "go.uber.org/zap"
  29. "golang.org/x/time/rate"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/transport"
  32. )
  33. type keyStresser struct {
  34. lg *zap.Logger
  35. m *rpcpb.Member
  36. weightKVWriteSmall float64
  37. weightKVWriteLarge float64
  38. weightKVReadOneKey float64
  39. weightKVReadRange float64
  40. weightKVDeleteOneKey float64
  41. weightKVDeleteRange float64
  42. weightKVTxnWriteDelete float64
  43. keySize int
  44. keyLargeSize int
  45. keySuffixRange int
  46. keyTxnSuffixRange int
  47. keyTxnOps int
  48. rateLimiter *rate.Limiter
  49. wg sync.WaitGroup
  50. clientsN int
  51. ctx context.Context
  52. cancel func()
  53. cli *clientv3.Client
  54. emu sync.RWMutex
  55. ems map[string]int
  56. paused bool
  57. // atomicModifiedKeys records the number of keys created and deleted by the stresser.
  58. atomicModifiedKeys int64
  59. stressTable *stressTable
  60. }
  61. func (s *keyStresser) Stress() error {
  62. var err error
  63. s.cli, err = s.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
  64. if err != nil {
  65. return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
  66. }
  67. s.ctx, s.cancel = context.WithCancel(context.Background())
  68. s.wg.Add(s.clientsN)
  69. s.stressTable = createStressTable([]stressEntry{
  70. {weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
  71. {weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)},
  72. {weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)},
  73. {weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
  74. {weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)},
  75. {weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
  76. {weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)},
  77. })
  78. s.emu.Lock()
  79. s.paused = false
  80. s.ems = make(map[string]int, 100)
  81. s.emu.Unlock()
  82. for i := 0; i < s.clientsN; i++ {
  83. go s.run()
  84. }
  85. s.lg.Info(
  86. "stress START",
  87. zap.String("stress-type", "KV"),
  88. zap.String("endpoint", s.m.EtcdClientEndpoint),
  89. )
  90. return nil
  91. }
  92. func (s *keyStresser) run() {
  93. defer s.wg.Done()
  94. for {
  95. if err := s.rateLimiter.Wait(s.ctx); err == context.Canceled {
  96. return
  97. }
  98. // TODO: 10-second is enough timeout to cover leader failure
  99. // and immediate leader election. Find out what other cases this
  100. // could be timed out.
  101. sctx, scancel := context.WithTimeout(s.ctx, 10*time.Second)
  102. err, modifiedKeys := s.stressTable.choose()(sctx)
  103. scancel()
  104. if err == nil {
  105. atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
  106. continue
  107. }
  108. switch rpctypes.ErrorDesc(err) {
  109. case context.DeadlineExceeded.Error():
  110. // This retries when request is triggered at the same time as
  111. // leader failure. When we terminate the leader, the request to
  112. // that leader cannot be processed, and times out. Also requests
  113. // to followers cannot be forwarded to the old leader, so timing out
  114. // as well. We want to keep stressing until the cluster elects a
  115. // new leader and start processing requests again.
  116. case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
  117. // This retries when request is triggered at the same time as
  118. // leader failure and follower nodes receive time out errors
  119. // from losing their leader. Followers should retry to connect
  120. // to the new leader.
  121. case etcdserver.ErrStopped.Error():
  122. // one of the etcd nodes stopped from failure injection
  123. case transport.ErrConnClosing.Desc:
  124. // server closed the transport (failure injected node)
  125. case rpctypes.ErrNotCapable.Error():
  126. // capability check has not been done (in the beginning)
  127. case rpctypes.ErrTooManyRequests.Error():
  128. // hitting the recovering member.
  129. case raft.ErrProposalDropped.Error():
  130. // removed member, or leadership has changed (old leader got raftpb.MsgProp)
  131. case context.Canceled.Error():
  132. // from stresser.Cancel method:
  133. return
  134. case grpc.ErrClientConnClosing.Error():
  135. // from stresser.Cancel method:
  136. return
  137. default:
  138. s.lg.Warn(
  139. "stress run exiting",
  140. zap.String("stress-type", "KV"),
  141. zap.String("endpoint", s.m.EtcdClientEndpoint),
  142. zap.String("error-type", reflect.TypeOf(err).String()),
  143. zap.String("error-desc", rpctypes.ErrorDesc(err)),
  144. zap.Error(err),
  145. )
  146. return
  147. }
  148. // only record errors before pausing stressers
  149. s.emu.Lock()
  150. if !s.paused {
  151. s.ems[err.Error()]++
  152. }
  153. s.emu.Unlock()
  154. }
  155. }
  156. func (s *keyStresser) Pause() map[string]int {
  157. return s.Close()
  158. }
  159. func (s *keyStresser) Close() map[string]int {
  160. s.cancel()
  161. s.cli.Close()
  162. s.wg.Wait()
  163. s.emu.Lock()
  164. s.paused = true
  165. ess := s.ems
  166. s.ems = make(map[string]int, 100)
  167. s.emu.Unlock()
  168. s.lg.Info(
  169. "stress STOP",
  170. zap.String("stress-type", "KV"),
  171. zap.String("endpoint", s.m.EtcdClientEndpoint),
  172. )
  173. return ess
  174. }
  175. func (s *keyStresser) ModifiedKeys() int64 {
  176. return atomic.LoadInt64(&s.atomicModifiedKeys)
  177. }
  178. type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
  179. type stressEntry struct {
  180. weight float64
  181. f stressFunc
  182. }
  183. type stressTable struct {
  184. entries []stressEntry
  185. sumWeights float64
  186. }
  187. func createStressTable(entries []stressEntry) *stressTable {
  188. st := stressTable{entries: entries}
  189. for _, entry := range st.entries {
  190. st.sumWeights += entry.weight
  191. }
  192. return &st
  193. }
  194. func (st *stressTable) choose() stressFunc {
  195. v := rand.Float64() * st.sumWeights
  196. var sum float64
  197. var idx int
  198. for i := range st.entries {
  199. sum += st.entries[i].weight
  200. if sum >= v {
  201. idx = i
  202. break
  203. }
  204. }
  205. return st.entries[idx].f
  206. }
  207. func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc {
  208. return func(ctx context.Context) (error, int64) {
  209. _, err := cli.Put(
  210. ctx,
  211. fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)),
  212. string(randBytes(keySize)),
  213. )
  214. return err, 1
  215. }
  216. }
  217. func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc {
  218. keys := make([]string, keyTxnSuffixRange)
  219. for i := range keys {
  220. keys[i] = fmt.Sprintf("/k%03d", i)
  221. }
  222. return writeTxn(cli, keys, txnOps)
  223. }
  224. func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc {
  225. return func(ctx context.Context) (error, int64) {
  226. ks := make(map[string]struct{}, txnOps)
  227. for len(ks) != txnOps {
  228. ks[keys[rand.Intn(len(keys))]] = struct{}{}
  229. }
  230. selected := make([]string, 0, txnOps)
  231. for k := range ks {
  232. selected = append(selected, k)
  233. }
  234. com, delOp, putOp := getTxnOps(selected[0], "bar00")
  235. thenOps := []clientv3.Op{delOp}
  236. elseOps := []clientv3.Op{putOp}
  237. for i := 1; i < txnOps; i++ { // nested txns
  238. k, v := selected[i], fmt.Sprintf("bar%02d", i)
  239. com, delOp, putOp = getTxnOps(k, v)
  240. txnOp := clientv3.OpTxn(
  241. []clientv3.Cmp{com},
  242. []clientv3.Op{delOp},
  243. []clientv3.Op{putOp},
  244. )
  245. thenOps = append(thenOps, txnOp)
  246. elseOps = append(elseOps, txnOp)
  247. }
  248. _, err := cli.Txn(ctx).
  249. If(com).
  250. Then(thenOps...).
  251. Else(elseOps...).
  252. Commit()
  253. return err, int64(txnOps)
  254. }
  255. }
  256. func getTxnOps(k, v string) (
  257. cmp clientv3.Cmp,
  258. dop clientv3.Op,
  259. pop clientv3.Op) {
  260. // if key exists (version > 0)
  261. cmp = clientv3.Compare(clientv3.Version(k), ">", 0)
  262. dop = clientv3.OpDelete(k)
  263. pop = clientv3.OpPut(k, v)
  264. return cmp, dop, pop
  265. }
  266. func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc {
  267. return func(ctx context.Context) (error, int64) {
  268. _, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
  269. return err, 0
  270. }
  271. }
  272. func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
  273. return func(ctx context.Context) (error, int64) {
  274. start := rand.Intn(keySuffixRange)
  275. end := start + 500
  276. _, err := cli.Get(
  277. ctx,
  278. fmt.Sprintf("foo%016x", start),
  279. clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
  280. )
  281. return err, 0
  282. }
  283. }
  284. func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc {
  285. return func(ctx context.Context) (error, int64) {
  286. _, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
  287. return err, 1
  288. }
  289. }
  290. func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
  291. return func(ctx context.Context) (error, int64) {
  292. start := rand.Intn(keySuffixRange)
  293. end := start + 500
  294. resp, err := cli.Delete(ctx,
  295. fmt.Sprintf("foo%016x", start),
  296. clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
  297. )
  298. if err == nil {
  299. return nil, resp.Deleted
  300. }
  301. return err, 0
  302. }
  303. }