Browse Source

etcdctl/check: create new check command for memory usage

Create a new command similar to check perf that can check the memory
consumption for putting different workloads on a given endpoint. If no endpoint
is provided, localhost will be used. Return user with a message that whether
there are enough memory for a given workload with pass or fail.

Fixed #9121
Sahdev P. Zala 8 years ago
parent
commit
53d2a2edfe
2 changed files with 204 additions and 0 deletions
  1. 161 0
      etcdctl/ctlv3/command/check.go
  2. 43 0
      etcdctl/ctlv3/command/util.go

+ 161 - 0
etcdctl/ctlv3/command/check.go

@@ -21,6 +21,7 @@ import (
 	"math"
 	"math/rand"
 	"os"
+	"strconv"
 	"sync"
 	"time"
 
@@ -37,6 +38,8 @@ var (
 	checkPerfPrefix      string
 	checkPerfAutoCompact bool
 	checkPerfAutoDefrag  bool
+	checkDatascaleLoad   string
+	checkDatascalePrefix string
 )
 
 type checkPerfCfg struct {
@@ -69,6 +72,36 @@ var checkPerfCfgMap = map[string]checkPerfCfg{
 	},
 }
 
+type checkDatascaleCfg struct {
+	limit   int
+	kvSize  int
+	clients int
+}
+
+var checkDatascaleCfgMap = map[string]checkDatascaleCfg{
+	"s": {
+		limit:   10000,
+		kvSize:  1024,
+		clients: 50,
+	},
+	"m": {
+		limit:   100000,
+		kvSize:  1024,
+		clients: 200,
+	},
+	"l": {
+		limit:   1000000,
+		kvSize:  1024,
+		clients: 500,
+	},
+	"xl": {
+		// xl tries to hit the upper bound aggressively which is 3 versions of 1M objects (3M in total)
+		limit:   30000000,
+		kvSize:  1024,
+		clients: 1000,
+	},
+}
+
 // NewCheckCommand returns the cobra command for "check".
 func NewCheckCommand() *cobra.Command {
 	cc := &cobra.Command{
@@ -77,6 +110,7 @@ func NewCheckCommand() *cobra.Command {
 	}
 
 	cc.AddCommand(NewCheckPerfCommand())
+	cc.AddCommand(NewCheckDatascaleCommand())
 
 	return cc
 }
@@ -252,3 +286,130 @@ func defrag(c *v3.Client, ep string) {
 	}
 	fmt.Printf("Defragmented %q\n", ep)
 }
+
+// NewCheckDatascaleCommand returns the cobra command for "check datascale".
+func NewCheckDatascaleCommand() *cobra.Command {
+	cmd := &cobra.Command{
+		Use:   "datascale [options]",
+		Short: "Check the memory usage of holding data for diferent workloads on a given server endpoint.",
+		Long:  "If no endpoint is provided, localhost will be used. If multiple endpoints are provided, first endpoint will be used.",
+		Run:   newCheckDatascaleCommand,
+	}
+
+	cmd.Flags().StringVar(&checkDatascaleLoad, "load", "s", "The datascale check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)")
+	cmd.Flags().StringVar(&checkDatascalePrefix, "prefix", "/etcdctl-check-datascale/", "The prefix for writing the datascale check's keys.")
+
+	return cmd
+}
+
+// newCheckDatascaleCommand executes the "check datascale" command.
+func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
+	var checkDatascaleAlias = map[string]string{
+		"s": "s", "small": "s",
+		"m": "m", "medium": "m",
+		"l": "l", "large": "l",
+		"xl": "xl", "xLarge": "xl",
+	}
+
+	model, ok := checkDatascaleAlias[checkDatascaleLoad]
+	if !ok {
+		ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkDatascaleLoad))
+	}
+	cfg := checkDatascaleCfgMap[model]
+
+	requests := make(chan v3.Op, cfg.clients)
+
+	cc := clientConfigFromCmd(cmd)
+	clients := make([]*v3.Client, cfg.clients)
+	for i := 0; i < cfg.clients; i++ {
+		clients[i] = cc.mustClient()
+	}
+
+	// get endpoints
+	eps, errEndpoints := endpointsFromCmd(cmd)
+	if errEndpoints != nil {
+		ExitWithError(ExitError, errEndpoints)
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	resp, err := clients[0].Get(ctx, checkDatascalePrefix, v3.WithPrefix(), v3.WithLimit(1))
+	cancel()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	if len(resp.Kvs) > 0 {
+		ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkDatascalePrefix, checkDatascalePrefix))
+	}
+
+	ksize, vsize := 512, 512
+	k, v := make([]byte, ksize), string(make([]byte, vsize))
+
+	r := report.NewReport("%4.4f")
+	var wg sync.WaitGroup
+	wg.Add(len(clients))
+
+	// get the process_resident_memory_bytes and process_virtual_memory_bytes before the put operations
+	bytesBefore := endpointMemoryMetrics(eps[0])
+	if bytesBefore == 0 {
+		fmt.Println("FAIL: Could not read process_resident_memory_bytes before the put operations.")
+		os.Exit(ExitError)
+	}
+
+	fmt.Println(fmt.Sprintf("Start data scale check for work load [%v key-value pairs, %v bytes per key-value, %v concurrent clients].", cfg.limit, cfg.kvSize, cfg.clients))
+	for i := range clients {
+		go func(c *v3.Client) {
+			defer wg.Done()
+			for op := range requests {
+				st := time.Now()
+				_, derr := c.Do(context.Background(), op)
+				r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()}
+			}
+		}(clients[i])
+	}
+
+	go func() {
+		for i := 0; i < cfg.limit; i++ {
+			binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64)))
+			requests <- v3.OpPut(checkDatascalePrefix+string(k), v)
+		}
+		close(requests)
+	}()
+
+	sc := r.Stats()
+	wg.Wait()
+	close(r.Results())
+	s := <-sc
+
+	// get the process_resident_memory_bytes after the put operations
+	bytesAfter := endpointMemoryMetrics(eps[0])
+	if bytesAfter == 0 {
+		fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
+		os.Exit(ExitError)
+	}
+
+	// delete the created kv pairs
+	ctx, cancel = context.WithCancel(context.Background())
+	_, err = clients[0].Delete(ctx, checkDatascalePrefix, v3.WithPrefix())
+	defer cancel()
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+
+	if bytesAfter == 0 {
+		fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
+		os.Exit(ExitError)
+	}
+
+	bytesUsed := bytesAfter - bytesBefore
+	mbUsed := bytesUsed / (1024 * 1024)
+
+	if len(s.ErrorDist) != 0 {
+		fmt.Println("FAIL: too many errors")
+		for k, v := range s.ErrorDist {
+			fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v)
+		}
+		os.Exit(ExitError)
+	} else {
+		fmt.Println(fmt.Sprintf("PASS: Approximate system memory used : %v MB.", strconv.FormatFloat(float64(mbUsed), 'f', 2, 64)))
+	}
+}

+ 43 - 0
etcdctl/ctlv3/command/util.go

@@ -18,7 +18,11 @@ import (
 	"context"
 	"encoding/hex"
 	"fmt"
+	"io/ioutil"
+	"net/http"
 	"regexp"
+	"strconv"
+	"strings"
 
 	pb "github.com/coreos/etcd/internal/mvcc/mvccpb"
 
@@ -75,3 +79,42 @@ func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
 	}
 	return context.WithTimeout(context.Background(), timeOut)
 }
+
+// get the process_resident_memory_bytes from <server:2379>/metrics
+func endpointMemoryMetrics(host string) float64 {
+	residentMemoryKey := "process_resident_memory_bytes"
+	var residentMemoryValue string
+	if !strings.HasPrefix(host, `http://`) {
+		host = "http://" + host
+	}
+	url := host + "/metrics"
+	resp, err := http.Get(url)
+	if err != nil {
+		fmt.Println(fmt.Sprintf("fetch error: %v", err))
+		return 0.0
+	}
+	byts, readerr := ioutil.ReadAll(resp.Body)
+	resp.Body.Close()
+	if readerr != nil {
+		fmt.Println(fmt.Sprintf("fetch error: reading %s: %v", url, readerr))
+		return 0.0
+	}
+
+	for _, line := range strings.Split(string(byts), "\n") {
+		if strings.HasPrefix(line, residentMemoryKey) {
+			residentMemoryValue = strings.TrimSpace(strings.TrimPrefix(line, residentMemoryKey))
+			break
+		}
+	}
+	if residentMemoryValue == "" {
+		fmt.Println(fmt.Sprintf("could not find: %v", residentMemoryKey))
+		return 0.0
+	}
+	residentMemoryBytes, parseErr := strconv.ParseFloat(residentMemoryValue, 64)
+	if parseErr != nil {
+		fmt.Println(fmt.Sprintf("parse error: %v", parseErr))
+		return 0.0
+	}
+
+	return residentMemoryBytes
+}