Browse Source

benchmark: refactor watch benchmark

Anthony Romano 8 years ago
parent
commit
56db7e56f9
1 changed files with 125 additions and 89 deletions
  1. 125 89
      tools/benchmark/cmd/watch.go

+ 125 - 89
tools/benchmark/cmd/watch.go

@@ -15,6 +15,7 @@
 package cmd
 
 import (
+	"context"
 	"encoding/binary"
 	"fmt"
 	"math/rand"
@@ -22,11 +23,11 @@ import (
 	"sync/atomic"
 	"time"
 
-	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/pkg/report"
 
 	"github.com/spf13/cobra"
-	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 	"gopkg.in/cheggaaa/pb.v1"
 )
 
@@ -50,9 +51,9 @@ Each key is watched by (--total/--watched-key-total) watchers.
 }
 
 var (
-	watchTotalStreams int
-	watchTotal        int
-	watchedKeyTotal   int
+	watchStreams          int
+	watchWatchesPerStream int
+	watchedKeyTotal       int
 
 	watchPutRate  int
 	watchPutTotal int
@@ -60,23 +61,27 @@ var (
 	watchKeySize      int
 	watchKeySpaceSize int
 	watchSeqKeys      bool
+)
 
-	eventsTotal int
+type watchedKeys struct {
+	watched     []string
+	numWatchers map[string]int
 
-	nrWatchCompleted       int32
-	nrRecvCompleted        int32
-	watchCompletedNotifier chan struct{}
-	recvCompletedNotifier  chan struct{}
-)
+	watches []clientv3.WatchChan
+
+	// ctx to control all watches
+	ctx    context.Context
+	cancel context.CancelFunc
+}
 
 func init() {
 	RootCmd.AddCommand(watchCmd)
-	watchCmd.Flags().IntVar(&watchTotalStreams, "watchers", 10000, "Total number of watchers")
-	watchCmd.Flags().IntVar(&watchTotal, "total", 100000, "Total number of watch requests")
-	watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 10000, "Total number of keys to be watched")
+	watchCmd.Flags().IntVar(&watchStreams, "streams", 10, "Total watch streams")
+	watchCmd.Flags().IntVar(&watchWatchesPerStream, "watch-per-stream", 100, "Total watchers per stream")
+	watchCmd.Flags().IntVar(&watchedKeyTotal, "watched-key-total", 1, "Total number of keys to be watched")
 
-	watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 100, "Number of keys to put per second")
-	watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 10000, "Number of put requests")
+	watchCmd.Flags().IntVar(&watchPutRate, "put-rate", 0, "Number of keys to put per second")
+	watchCmd.Flags().IntVar(&watchPutTotal, "put-total", 1000, "Number of put requests")
 
 	watchCmd.Flags().IntVar(&watchKeySize, "key-size", 32, "Key size of watch request")
 	watchCmd.Flags().IntVar(&watchKeySpaceSize, "key-space-size", 1, "Maximum possible keys")
@@ -88,124 +93,155 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", watchKeySpaceSize)
 		os.Exit(1)
 	}
-
-	watched := make([]string, watchedKeyTotal)
-	numWatchers := make(map[string]int)
-	for i := range watched {
-		k := make([]byte, watchKeySize)
-		if watchSeqKeys {
-			binary.PutVarint(k, int64(i%watchKeySpaceSize))
-		} else {
-			binary.PutVarint(k, int64(rand.Intn(watchKeySpaceSize)))
-		}
-		watched[i] = string(k)
+	grpcConns := int(totalClients)
+	if totalClients > totalConns {
+		grpcConns = int(totalConns)
+	}
+	wantedConns := 1 + (watchStreams / 100)
+	if grpcConns < wantedConns {
+		fmt.Fprintf(os.Stderr, "warning: grpc limits 100 streams per client connection, have %d but need %d\n", grpcConns, wantedConns)
 	}
-
-	requests := make(chan string, totalClients)
-
 	clients := mustCreateClients(totalClients, totalConns)
+	wk := newWatchedKeys()
+	benchMakeWatches(clients, wk)
+	benchPutWatches(clients, wk)
+}
 
-	streams := make([]v3.Watcher, watchTotalStreams)
+func benchMakeWatches(clients []*clientv3.Client, wk *watchedKeys) {
+	streams := make([]clientv3.Watcher, watchStreams)
 	for i := range streams {
-		streams[i] = v3.NewWatcher(clients[i%len(clients)])
+		streams[i] = clientv3.NewWatcher(clients[i%len(clients)])
 	}
 
-	// watching phase
-	bar = pb.New(watchTotal)
+	keyc := make(chan string, watchStreams)
+	bar = pb.New(watchStreams * watchWatchesPerStream)
 	bar.Format("Bom !")
 	bar.Start()
 
-	atomic.StoreInt32(&nrWatchCompleted, int32(0))
-	watchCompletedNotifier = make(chan struct{})
-
-	r := report.NewReportRate("%4.4f")
-	for i := range streams {
-		go doWatch(streams[i], requests, r.Results())
+	r := newReport()
+	rch := r.Results()
+
+	wg.Add(len(streams) + 1)
+	wc := make(chan []clientv3.WatchChan, len(streams))
+	for _, s := range streams {
+		go func(s clientv3.Watcher) {
+			defer wg.Done()
+			var ws []clientv3.WatchChan
+			for i := 0; i < watchWatchesPerStream; i++ {
+				k := <-keyc
+				st := time.Now()
+				wch := s.Watch(wk.ctx, k)
+				rch <- report.Result{Start: st, End: time.Now()}
+				ws = append(ws, wch)
+				bar.Increment()
+			}
+			wc <- ws
+		}(s)
 	}
-
 	go func() {
-		for i := 0; i < watchTotal; i++ {
-			key := watched[i%len(watched)]
-			requests <- key
-			numWatchers[key]++
+		defer func() {
+			close(keyc)
+			wg.Done()
+		}()
+		for i := 0; i < watchStreams*watchWatchesPerStream; i++ {
+			key := wk.watched[i%len(wk.watched)]
+			keyc <- key
+			wk.numWatchers[key]++
 		}
-		close(requests)
 	}()
 
 	rc := r.Run()
-	<-watchCompletedNotifier
+	wg.Wait()
 	bar.Finish()
 	close(r.Results())
 	fmt.Printf("Watch creation summary:\n%s", <-rc)
 
-	// put phase
-	eventsTotal = 0
+	for i := 0; i < len(streams); i++ {
+		wk.watches = append(wk.watches, (<-wc)...)
+	}
+}
+
+func newWatchedKeys() *watchedKeys {
+	watched := make([]string, watchedKeyTotal)
+	for i := range watched {
+		k := make([]byte, watchKeySize)
+		if watchSeqKeys {
+			binary.PutVarint(k, int64(i%watchKeySpaceSize))
+		} else {
+			binary.PutVarint(k, int64(rand.Intn(watchKeySpaceSize)))
+		}
+		watched[i] = string(k)
+	}
+	ctx, cancel := context.WithCancel(context.TODO())
+	return &watchedKeys{
+		watched:     watched,
+		numWatchers: make(map[string]int),
+		ctx:         ctx,
+		cancel:      cancel,
+	}
+}
+
+func benchPutWatches(clients []*clientv3.Client, wk *watchedKeys) {
+	eventsTotal := 0
 	for i := 0; i < watchPutTotal; i++ {
-		eventsTotal += numWatchers[watched[i%len(watched)]]
+		eventsTotal += wk.numWatchers[wk.watched[i%len(wk.watched)]]
 	}
 
 	bar = pb.New(eventsTotal)
 	bar.Format("Bom !")
 	bar.Start()
 
-	atomic.StoreInt32(&nrRecvCompleted, 0)
-	recvCompletedNotifier = make(chan struct{})
-	putreqc := make(chan v3.Op)
+	r := newReport()
 
-	r = report.NewReportRate("%4.4f")
-	for i := 0; i < watchPutTotal; i++ {
-		go func(c *v3.Client) {
-			for op := range putreqc {
-				if _, err := c.Do(context.TODO(), op); err != nil {
-					fmt.Fprintf(os.Stderr, "failed to Put for watch benchmark: %v\n", err)
-					os.Exit(1)
-				}
-			}
-		}(clients[i%len(clients)])
+	wg.Add(len(wk.watches))
+	nrRxed := int32(eventsTotal)
+	for _, w := range wk.watches {
+		go func(wc clientv3.WatchChan) {
+			defer wg.Done()
+			recvWatchChan(wc, r.Results(), &nrRxed)
+			wk.cancel()
+		}(w)
 	}
 
+	putreqc := make(chan clientv3.Op, len(clients))
 	go func() {
+		defer close(putreqc)
 		for i := 0; i < watchPutTotal; i++ {
-			putreqc <- v3.OpPut(watched[i%(len(watched))], "data")
-			// TODO: use a real rate-limiter instead of sleep.
-			time.Sleep(time.Second / time.Duration(watchPutRate))
+			putreqc <- clientv3.OpPut(wk.watched[i%(len(wk.watched))], "data")
 		}
-		close(putreqc)
 	}()
 
-	rc = r.Run()
-	<-recvCompletedNotifier
+	limit := rate.NewLimiter(rate.Limit(watchPutRate), 1)
+	for _, cc := range clients {
+		go func(c *clientv3.Client) {
+			for op := range putreqc {
+				if err := limit.Wait(context.TODO()); err != nil {
+					panic(err)
+				}
+				if _, err := c.Do(context.TODO(), op); err != nil {
+					panic(err)
+				}
+			}
+		}(cc)
+	}
+
+	rc := r.Run()
+	wg.Wait()
 	bar.Finish()
 	close(r.Results())
 	fmt.Printf("Watch events received summary:\n%s", <-rc)
-}
 
-func doWatch(stream v3.Watcher, requests <-chan string, results chan<- report.Result) {
-	for r := range requests {
-		st := time.Now()
-		wch := stream.Watch(context.TODO(), r)
-		results <- report.Result{Start: st, End: time.Now()}
-		bar.Increment()
-		go recvWatchChan(wch, results)
-	}
-	atomic.AddInt32(&nrWatchCompleted, 1)
-	if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
-		watchCompletedNotifier <- struct{}{}
-	}
 }
 
-func recvWatchChan(wch v3.WatchChan, results chan<- report.Result) {
+func recvWatchChan(wch clientv3.WatchChan, results chan<- report.Result, nrRxed *int32) {
 	for r := range wch {
 		st := time.Now()
 		for range r.Events {
 			results <- report.Result{Start: st, End: time.Now()}
 			bar.Increment()
-			atomic.AddInt32(&nrRecvCompleted, 1)
-		}
-
-		if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
-			recvCompletedNotifier <- struct{}{}
-			break
+			if atomic.AddInt32(nrRxed, -1) <= 0 {
+				return
+			}
 		}
 	}
 }