key_stresser.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package
  22. "golang.org/x/time/rate"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/transport"
  25. "github.com/coreos/etcd/etcdserver"
  26. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  27. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  28. )
  29. type keyStresser struct {
  30. Endpoint string
  31. keyLargeSize int
  32. keySize int
  33. keySuffixRange int
  34. N int
  35. rateLimiter *rate.Limiter
  36. wg sync.WaitGroup
  37. cancel func()
  38. conn *grpc.ClientConn
  39. // atomicModifiedKeys records the number of keys created and deleted by the stresser.
  40. atomicModifiedKeys int64
  41. stressTable *stressTable
  42. }
  43. func (s *keyStresser) Stress() error {
  44. // TODO: add backoff option
  45. conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
  46. if err != nil {
  47. return fmt.Errorf("%v (%s)", err, s.Endpoint)
  48. }
  49. ctx, cancel := context.WithCancel(context.Background())
  50. s.wg.Add(s.N)
  51. s.conn = conn
  52. s.cancel = cancel
  53. kvc := pb.NewKVClient(conn)
  54. var stressEntries = []stressEntry{
  55. {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
  56. {
  57. weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
  58. f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
  59. },
  60. {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
  61. {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
  62. {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
  63. {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
  64. }
  65. s.stressTable = createStressTable(stressEntries)
  66. for i := 0; i < s.N; i++ {
  67. go s.run(ctx)
  68. }
  69. plog.Infof("keyStresser %q is started", s.Endpoint)
  70. return nil
  71. }
  72. func (s *keyStresser) run(ctx context.Context) {
  73. defer s.wg.Done()
  74. for {
  75. if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
  76. return
  77. }
  78. // TODO: 10-second is enough timeout to cover leader failure
  79. // and immediate leader election. Find out what other cases this
  80. // could be timed out.
  81. sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
  82. err, modifiedKeys := s.stressTable.choose()(sctx)
  83. scancel()
  84. if err == nil {
  85. atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
  86. continue
  87. }
  88. switch grpc.ErrorDesc(err) {
  89. case context.DeadlineExceeded.Error():
  90. // This retries when request is triggered at the same time as
  91. // leader failure. When we terminate the leader, the request to
  92. // that leader cannot be processed, and times out. Also requests
  93. // to followers cannot be forwarded to the old leader, so timing out
  94. // as well. We want to keep stressing until the cluster elects a
  95. // new leader and start processing requests again.
  96. case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
  97. // This retries when request is triggered at the same time as
  98. // leader failure and follower nodes receive time out errors
  99. // from losing their leader. Followers should retry to connect
  100. // to the new leader.
  101. case etcdserver.ErrStopped.Error():
  102. // one of the etcd nodes stopped from failure injection
  103. case transport.ErrConnClosing.Desc:
  104. // server closed the transport (failure injected node)
  105. case rpctypes.ErrNotCapable.Error():
  106. // capability check has not been done (in the beginning)
  107. case rpctypes.ErrTooManyRequests.Error():
  108. // hitting the recovering member.
  109. case context.Canceled.Error():
  110. // from stresser.Cancel method:
  111. return
  112. case grpc.ErrClientConnClosing.Error():
  113. // from stresser.Cancel method:
  114. return
  115. default:
  116. plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err)
  117. return
  118. }
  119. }
  120. }
  121. func (s *keyStresser) Pause() {
  122. s.Close()
  123. }
  124. func (s *keyStresser) Close() {
  125. s.cancel()
  126. s.conn.Close()
  127. s.wg.Wait()
  128. plog.Infof("keyStresser %q is closed", s.Endpoint)
  129. }
  130. func (s *keyStresser) ModifiedKeys() int64 {
  131. return atomic.LoadInt64(&s.atomicModifiedKeys)
  132. }
  133. func (s *keyStresser) Checker() Checker { return nil }
  134. type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
  135. type stressEntry struct {
  136. weight float32
  137. f stressFunc
  138. }
  139. type stressTable struct {
  140. entries []stressEntry
  141. sumWeights float32
  142. }
  143. func createStressTable(entries []stressEntry) *stressTable {
  144. st := stressTable{entries: entries}
  145. for _, entry := range st.entries {
  146. st.sumWeights += entry.weight
  147. }
  148. return &st
  149. }
  150. func (st *stressTable) choose() stressFunc {
  151. v := rand.Float32() * st.sumWeights
  152. var sum float32
  153. var idx int
  154. for i := range st.entries {
  155. sum += st.entries[i].weight
  156. if sum >= v {
  157. idx = i
  158. break
  159. }
  160. }
  161. return st.entries[idx].f
  162. }
  163. func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
  164. return func(ctx context.Context) (error, int64) {
  165. _, err := kvc.Put(ctx, &pb.PutRequest{
  166. Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
  167. Value: randBytes(keySize),
  168. }, grpc.FailFast(false))
  169. return err, 1
  170. }
  171. }
  172. func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
  173. return func(ctx context.Context) (error, int64) {
  174. _, err := kvc.Range(ctx, &pb.RangeRequest{
  175. Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
  176. }, grpc.FailFast(false))
  177. return err, 0
  178. }
  179. }
  180. func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
  181. return func(ctx context.Context) (error, int64) {
  182. start := rand.Intn(keySuffixRange)
  183. end := start + 500
  184. _, err := kvc.Range(ctx, &pb.RangeRequest{
  185. Key: []byte(fmt.Sprintf("foo%016x", start)),
  186. RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
  187. }, grpc.FailFast(false))
  188. return err, 0
  189. }
  190. }
  191. func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
  192. return func(ctx context.Context) (error, int64) {
  193. _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
  194. Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
  195. }, grpc.FailFast(false))
  196. return err, 1
  197. }
  198. }
  199. func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
  200. return func(ctx context.Context) (error, int64) {
  201. start := rand.Intn(keySuffixRange)
  202. end := start + 500
  203. resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
  204. Key: []byte(fmt.Sprintf("foo%016x", start)),
  205. RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
  206. }, grpc.FailFast(false))
  207. if err == nil {
  208. return nil, resp.Deleted
  209. }
  210. return err, 0
  211. }
  212. }