Browse Source

benchmark: support multiple clients/conns in watch-latency benchmark

Anthony Romano 8 years ago
parent
commit
ebd6e8c4b1
1 changed files with 31 additions and 12 deletions
  1. 31 12
      tools/benchmark/cmd/watch_latency.go

+ 31 - 12
tools/benchmark/cmd/watch_latency.go

@@ -17,9 +17,10 @@ package cmd
 import (
 	"fmt"
 	"os"
+	"sync"
 	"time"
 
-	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/pkg/report"
 
 	"github.com/spf13/cobra"
@@ -47,19 +48,23 @@ var (
 
 func init() {
 	RootCmd.AddCommand(watchLatencyCmd)
-	watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of watch responses.")
+	watchLatencyCmd.Flags().IntVar(&watchLTotal, "total", 10000, "Total number of put requests")
 	watchLatencyCmd.Flags().IntVar(&watchLPutRate, "put-rate", 100, "Number of keys to put per second")
-	watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch request")
-	watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Val size of watch request")
+	watchLatencyCmd.Flags().IntVar(&watchLKeySize, "key-size", 32, "Key size of watch response")
+	watchLatencyCmd.Flags().IntVar(&watchLValueSize, "val-size", 32, "Value size of watch response")
 }
 
 func watchLatencyFunc(cmd *cobra.Command, args []string) {
 	key := string(mustRandBytes(watchLKeySize))
 	value := string(mustRandBytes(watchLValueSize))
 
-	client := mustCreateConn()
-	stream := v3.NewWatcher(client)
-	wch := stream.Watch(context.TODO(), key)
+	clients := mustCreateClients(totalClients, totalConns)
+	putClient := mustCreateConn()
+
+	wchs := make([]clientv3.WatchChan, len(clients))
+	for i := range wchs {
+		wchs[i] = clients[i].Watch(context.TODO(), key)
+	}
 
 	bar = pb.New(watchLTotal)
 	bar.Format("Bom !")
@@ -74,15 +79,29 @@ func watchLatencyFunc(cmd *cobra.Command, args []string) {
 		if err := limiter.Wait(context.TODO()); err != nil {
 			break
 		}
-		_, err := client.Put(context.TODO(), string(key), value)
 
-		if err != nil {
+		var st time.Time
+		var wg sync.WaitGroup
+		wg.Add(len(clients))
+		barrierc := make(chan struct{})
+		for _, wch := range wchs {
+			ch := wch
+			go func() {
+				<-barrierc
+				<-ch
+				r.Results() <- report.Result{Start: st, End: time.Now()}
+				wg.Done()
+			}()
+		}
+
+		if _, err := putClient.Put(context.TODO(), key, value); err != nil {
 			fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err)
 			os.Exit(1)
 		}
-		st := time.Now()
-		<-wch
-		r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
+
+		st = time.Now()
+		close(barrierc)
+		wg.Wait()
 		bar.Increment()
 	}