check.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "context"
  17. "encoding/binary"
  18. "fmt"
  19. "math"
  20. "math/rand"
  21. "os"
  22. "sync"
  23. "time"
  24. v3 "github.com/coreos/etcd/clientv3"
  25. "github.com/coreos/etcd/pkg/report"
  26. "github.com/spf13/cobra"
  27. "golang.org/x/time/rate"
  28. "gopkg.in/cheggaaa/pb.v1"
  29. )
  30. var (
  31. checkPerfLoad string
  32. checkPerfPrefix string
  33. checkPerfAutoCompact bool
  34. checkPerfAutoDefrag bool
  35. )
  36. type checkPerfCfg struct {
  37. limit int
  38. clients int
  39. duration int
  40. }
  41. var checkPerfCfgMap = map[string]checkPerfCfg{
  42. // TODO: support read limit
  43. "s": {
  44. limit: 150,
  45. clients: 50,
  46. duration: 60,
  47. },
  48. "m": {
  49. limit: 1000,
  50. clients: 200,
  51. duration: 60,
  52. },
  53. "l": {
  54. limit: 8000,
  55. clients: 500,
  56. duration: 60,
  57. },
  58. "xl": {
  59. limit: 15000,
  60. clients: 1000,
  61. duration: 60,
  62. },
  63. }
  64. // NewCheckCommand returns the cobra command for "check".
  65. func NewCheckCommand() *cobra.Command {
  66. cc := &cobra.Command{
  67. Use: "check <subcommand>",
  68. Short: "commands for checking properties of the etcd cluster",
  69. }
  70. cc.AddCommand(NewCheckPerfCommand())
  71. return cc
  72. }
  73. // NewCheckPerfCommand returns the cobra command for "check perf".
  74. func NewCheckPerfCommand() *cobra.Command {
  75. cmd := &cobra.Command{
  76. Use: "perf [options]",
  77. Short: "Check the performance of the etcd cluster",
  78. Run: newCheckPerfCommand,
  79. }
  80. // TODO: support customized configuration
  81. cmd.Flags().StringVar(&checkPerfLoad, "load", "s", "The performance check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)")
  82. cmd.Flags().StringVar(&checkPerfPrefix, "prefix", "/etcdctl-check-perf/", "The prefix for writing the performance check's keys.")
  83. cmd.Flags().BoolVar(&checkPerfAutoCompact, "auto-compact", false, "Compact storage with last revision after test is finished.")
  84. cmd.Flags().BoolVar(&checkPerfAutoDefrag, "auto-defrag", false, "Defragment storage after test is finished.")
  85. return cmd
  86. }
  87. // newCheckPerfCommand executes the "check perf" command.
  88. func newCheckPerfCommand(cmd *cobra.Command, args []string) {
  89. var checkPerfAlias = map[string]string{
  90. "s": "s", "small": "s",
  91. "m": "m", "medium": "m",
  92. "l": "l", "large": "l",
  93. "xl": "xl", "xLarge": "xl",
  94. }
  95. model, ok := checkPerfAlias[checkPerfLoad]
  96. if !ok {
  97. ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkPerfLoad))
  98. }
  99. cfg := checkPerfCfgMap[model]
  100. requests := make(chan v3.Op, cfg.clients)
  101. limit := rate.NewLimiter(rate.Limit(cfg.limit), 1)
  102. cc := clientConfigFromCmd(cmd)
  103. clients := make([]*v3.Client, cfg.clients)
  104. for i := 0; i < cfg.clients; i++ {
  105. clients[i] = cc.mustClient()
  106. }
  107. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second)
  108. resp, err := clients[0].Get(ctx, checkPerfPrefix, v3.WithPrefix(), v3.WithLimit(1))
  109. cancel()
  110. if err != nil {
  111. ExitWithError(ExitError, err)
  112. }
  113. if len(resp.Kvs) > 0 {
  114. ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkPerfPrefix, checkPerfPrefix))
  115. }
  116. ksize, vsize := 256, 1024
  117. k, v := make([]byte, ksize), string(make([]byte, vsize))
  118. bar := pb.New(cfg.duration)
  119. bar.Format("Bom !")
  120. bar.Start()
  121. r := report.NewReport("%4.4f")
  122. var wg sync.WaitGroup
  123. wg.Add(len(clients))
  124. for i := range clients {
  125. go func(c *v3.Client) {
  126. defer wg.Done()
  127. for op := range requests {
  128. st := time.Now()
  129. _, derr := c.Do(context.Background(), op)
  130. r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()}
  131. }
  132. }(clients[i])
  133. }
  134. go func() {
  135. cctx, ccancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second)
  136. defer ccancel()
  137. for limit.Wait(cctx) == nil {
  138. binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64)))
  139. requests <- v3.OpPut(checkPerfPrefix+string(k), v)
  140. }
  141. close(requests)
  142. }()
  143. go func() {
  144. for i := 0; i < cfg.duration; i++ {
  145. time.Sleep(time.Second)
  146. bar.Add(1)
  147. }
  148. bar.Finish()
  149. }()
  150. sc := r.Stats()
  151. wg.Wait()
  152. close(r.Results())
  153. s := <-sc
  154. ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
  155. dresp, err := clients[0].Delete(ctx, checkPerfPrefix, v3.WithPrefix())
  156. cancel()
  157. if err != nil {
  158. ExitWithError(ExitError, err)
  159. }
  160. if checkPerfAutoCompact {
  161. compact(clients[0], dresp.Header.Revision)
  162. }
  163. if checkPerfAutoDefrag {
  164. for _, ep := range clients[0].Endpoints() {
  165. defrag(clients[0], ep)
  166. }
  167. }
  168. ok = true
  169. if len(s.ErrorDist) != 0 {
  170. fmt.Println("FAIL: too many errors")
  171. for k, v := range s.ErrorDist {
  172. fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v)
  173. }
  174. ok = false
  175. }
  176. if s.RPS/float64(cfg.limit) <= 0.9 {
  177. fmt.Printf("FAIL: Throughput too low: %d writes/s\n", int(s.RPS)+1)
  178. ok = false
  179. } else {
  180. fmt.Printf("PASS: Throughput is %d writes/s\n", int(s.RPS)+1)
  181. }
  182. if s.Slowest > 0.5 { // slowest request > 500ms
  183. fmt.Printf("Slowest request took too long: %fs\n", s.Slowest)
  184. ok = false
  185. } else {
  186. fmt.Printf("PASS: Slowest request took %fs\n", s.Slowest)
  187. }
  188. if s.Stddev > 0.1 { // stddev > 100ms
  189. fmt.Printf("Stddev too high: %fs\n", s.Stddev)
  190. ok = false
  191. } else {
  192. fmt.Printf("PASS: Stddev is %fs\n", s.Stddev)
  193. }
  194. if ok {
  195. fmt.Println("PASS")
  196. } else {
  197. fmt.Println("FAIL")
  198. os.Exit(ExitError)
  199. }
  200. }
  201. func compact(c *v3.Client, rev int64) {
  202. fmt.Printf("Compacting with revision %d\n", rev)
  203. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  204. _, err := c.Compact(ctx, rev, v3.WithCompactPhysical())
  205. cancel()
  206. if err != nil {
  207. ExitWithError(ExitError, err)
  208. }
  209. fmt.Printf("Compacted with revision %d\n", rev)
  210. }
  211. func defrag(c *v3.Client, ep string) {
  212. fmt.Printf("Defragmenting %q\n", ep)
  213. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  214. _, err := c.Defragment(ctx, ep)
  215. cancel()
  216. if err != nil {
  217. ExitWithError(ExitError, err)
  218. }
  219. fmt.Printf("Defragmented %q\n", ep)
  220. }