stresser_key.go 9.2 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "reflect"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "go.etcd.io/etcd/clientv3"
  24. "go.etcd.io/etcd/etcdserver"
  25. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  26. "go.etcd.io/etcd/functional/rpcpb"
  27. "go.etcd.io/etcd/raft"
  28. "go.uber.org/zap"
  29. "golang.org/x/time/rate"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/status"
  33. )
  34. type keyStresser struct {
  35. lg *zap.Logger
  36. m *rpcpb.Member
  37. weightKVWriteSmall float64
  38. weightKVWriteLarge float64
  39. weightKVReadOneKey float64
  40. weightKVReadRange float64
  41. weightKVDeleteOneKey float64
  42. weightKVDeleteRange float64
  43. weightKVTxnWriteDelete float64
  44. keySize int
  45. keyLargeSize int
  46. keySuffixRange int
  47. keyTxnSuffixRange int
  48. keyTxnOps int
  49. rateLimiter *rate.Limiter
  50. wg sync.WaitGroup
  51. clientsN int
  52. ctx context.Context
  53. cancel func()
  54. cli *clientv3.Client
  55. emu sync.RWMutex
  56. ems map[string]int
  57. paused bool
  58. // atomicModifiedKeys records the number of keys created and deleted by the stresser.
  59. atomicModifiedKeys int64
  60. stressTable *stressTable
  61. }
  62. func (s *keyStresser) Stress() error {
  63. var err error
  64. s.cli, err = s.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
  65. if err != nil {
  66. return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
  67. }
  68. s.ctx, s.cancel = context.WithCancel(context.Background())
  69. s.wg.Add(s.clientsN)
  70. s.stressTable = createStressTable([]stressEntry{
  71. {weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
  72. {weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)},
  73. {weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)},
  74. {weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
  75. {weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)},
  76. {weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
  77. {weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)},
  78. })
  79. s.emu.Lock()
  80. s.paused = false
  81. s.ems = make(map[string]int, 100)
  82. s.emu.Unlock()
  83. for i := 0; i < s.clientsN; i++ {
  84. go s.run()
  85. }
  86. s.lg.Info(
  87. "stress START",
  88. zap.String("stress-type", "KV"),
  89. zap.String("endpoint", s.m.EtcdClientEndpoint),
  90. )
  91. return nil
  92. }
  93. func (s *keyStresser) run() {
  94. defer s.wg.Done()
  95. for {
  96. if err := s.rateLimiter.Wait(s.ctx); err == context.Canceled {
  97. return
  98. }
  99. // TODO: 10-second is enough timeout to cover leader failure
  100. // and immediate leader election. Find out what other cases this
  101. // could be timed out.
  102. sctx, scancel := context.WithTimeout(s.ctx, 10*time.Second)
  103. err, modifiedKeys := s.stressTable.choose()(sctx)
  104. scancel()
  105. if err == nil {
  106. atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
  107. continue
  108. }
  109. if !s.isRetryableError(err) {
  110. return
  111. }
  112. // only record errors before pausing stressers
  113. s.emu.Lock()
  114. if !s.paused {
  115. s.ems[err.Error()]++
  116. }
  117. s.emu.Unlock()
  118. }
  119. }
  120. func (s *keyStresser) isRetryableError(err error) bool {
  121. switch rpctypes.ErrorDesc(err) {
  122. // retryable
  123. case context.DeadlineExceeded.Error():
  124. // This retries when request is triggered at the same time as
  125. // leader failure. When we terminate the leader, the request to
  126. // that leader cannot be processed, and times out. Also requests
  127. // to followers cannot be forwarded to the old leader, so timing out
  128. // as well. We want to keep stressing until the cluster elects a
  129. // new leader and start processing requests again.
  130. return true
  131. case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
  132. // This retries when request is triggered at the same time as
  133. // leader failure and follower nodes receive time out errors
  134. // from losing their leader. Followers should retry to connect
  135. // to the new leader.
  136. return true
  137. case etcdserver.ErrStopped.Error():
  138. // one of the etcd nodes stopped from failure injection
  139. return true
  140. case rpctypes.ErrNotCapable.Error():
  141. // capability check has not been done (in the beginning)
  142. return true
  143. case rpctypes.ErrTooManyRequests.Error():
  144. // hitting the recovering member.
  145. return true
  146. case raft.ErrProposalDropped.Error():
  147. // removed member, or leadership has changed (old leader got raftpb.MsgProp)
  148. return true
  149. // not retryable.
  150. case context.Canceled.Error():
  151. // from stresser.Cancel method:
  152. return false
  153. }
  154. if status.Convert(err).Code() == codes.Unavailable {
  155. // gRPC connection errors are translated to status.Unavailable
  156. return true
  157. }
  158. s.lg.Warn(
  159. "stress run exiting",
  160. zap.String("stress-type", "KV"),
  161. zap.String("endpoint", s.m.EtcdClientEndpoint),
  162. zap.String("error-type", reflect.TypeOf(err).String()),
  163. zap.String("error-desc", rpctypes.ErrorDesc(err)),
  164. zap.Error(err),
  165. )
  166. return false
  167. }
  168. func (s *keyStresser) Pause() map[string]int {
  169. return s.Close()
  170. }
  171. func (s *keyStresser) Close() map[string]int {
  172. s.cancel()
  173. s.cli.Close()
  174. s.wg.Wait()
  175. s.emu.Lock()
  176. s.paused = true
  177. ess := s.ems
  178. s.ems = make(map[string]int, 100)
  179. s.emu.Unlock()
  180. s.lg.Info(
  181. "stress STOP",
  182. zap.String("stress-type", "KV"),
  183. zap.String("endpoint", s.m.EtcdClientEndpoint),
  184. )
  185. return ess
  186. }
  187. func (s *keyStresser) ModifiedKeys() int64 {
  188. return atomic.LoadInt64(&s.atomicModifiedKeys)
  189. }
  190. type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
  191. type stressEntry struct {
  192. weight float64
  193. f stressFunc
  194. }
  195. type stressTable struct {
  196. entries []stressEntry
  197. sumWeights float64
  198. }
  199. func createStressTable(entries []stressEntry) *stressTable {
  200. st := stressTable{entries: entries}
  201. for _, entry := range st.entries {
  202. st.sumWeights += entry.weight
  203. }
  204. return &st
  205. }
  206. func (st *stressTable) choose() stressFunc {
  207. v := rand.Float64() * st.sumWeights
  208. var sum float64
  209. var idx int
  210. for i := range st.entries {
  211. sum += st.entries[i].weight
  212. if sum >= v {
  213. idx = i
  214. break
  215. }
  216. }
  217. return st.entries[idx].f
  218. }
  219. func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc {
  220. return func(ctx context.Context) (error, int64) {
  221. _, err := cli.Put(
  222. ctx,
  223. fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)),
  224. string(randBytes(keySize)),
  225. )
  226. return err, 1
  227. }
  228. }
  229. func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc {
  230. keys := make([]string, keyTxnSuffixRange)
  231. for i := range keys {
  232. keys[i] = fmt.Sprintf("/k%03d", i)
  233. }
  234. return writeTxn(cli, keys, txnOps)
  235. }
  236. func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc {
  237. return func(ctx context.Context) (error, int64) {
  238. ks := make(map[string]struct{}, txnOps)
  239. for len(ks) != txnOps {
  240. ks[keys[rand.Intn(len(keys))]] = struct{}{}
  241. }
  242. selected := make([]string, 0, txnOps)
  243. for k := range ks {
  244. selected = append(selected, k)
  245. }
  246. com, delOp, putOp := getTxnOps(selected[0], "bar00")
  247. thenOps := []clientv3.Op{delOp}
  248. elseOps := []clientv3.Op{putOp}
  249. for i := 1; i < txnOps; i++ { // nested txns
  250. k, v := selected[i], fmt.Sprintf("bar%02d", i)
  251. com, delOp, putOp = getTxnOps(k, v)
  252. txnOp := clientv3.OpTxn(
  253. []clientv3.Cmp{com},
  254. []clientv3.Op{delOp},
  255. []clientv3.Op{putOp},
  256. )
  257. thenOps = append(thenOps, txnOp)
  258. elseOps = append(elseOps, txnOp)
  259. }
  260. _, err := cli.Txn(ctx).
  261. If(com).
  262. Then(thenOps...).
  263. Else(elseOps...).
  264. Commit()
  265. return err, int64(txnOps)
  266. }
  267. }
  268. func getTxnOps(k, v string) (
  269. cmp clientv3.Cmp,
  270. dop clientv3.Op,
  271. pop clientv3.Op) {
  272. // if key exists (version > 0)
  273. cmp = clientv3.Compare(clientv3.Version(k), ">", 0)
  274. dop = clientv3.OpDelete(k)
  275. pop = clientv3.OpPut(k, v)
  276. return cmp, dop, pop
  277. }
  278. func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc {
  279. return func(ctx context.Context) (error, int64) {
  280. _, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
  281. return err, 0
  282. }
  283. }
  284. func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
  285. return func(ctx context.Context) (error, int64) {
  286. start := rand.Intn(keySuffixRange)
  287. end := start + 500
  288. _, err := cli.Get(
  289. ctx,
  290. fmt.Sprintf("foo%016x", start),
  291. clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
  292. )
  293. return err, 0
  294. }
  295. }
  296. func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc {
  297. return func(ctx context.Context) (error, int64) {
  298. _, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
  299. return err, 1
  300. }
  301. }
  302. func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
  303. return func(ctx context.Context) (error, int64) {
  304. start := rand.Intn(keySuffixRange)
  305. end := start + 500
  306. resp, err := cli.Delete(ctx,
  307. fmt.Sprintf("foo%016x", start),
  308. clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
  309. )
  310. if err == nil {
  311. return nil, resp.Deleted
  312. }
  313. return err, 0
  314. }
  315. }