Browse Source

Merge pull request #4056 from heyitsanthony/benchmark-less-mem

tools/benchmark: stream results into reports
Anthony Romano 10 years ago
parent
commit
7e5b7cfc65

+ 16 - 18
tools/benchmark/cmd/put.go

@@ -47,8 +47,8 @@ func init() {
 }
 
 func putFunc(cmd *cobra.Command, args []string) {
-	results = make(chan *result, putTotal)
-	requests := make(chan *etcdserverpb.PutRequest, putTotal)
+	results = make(chan result)
+	requests := make(chan etcdserverpb.PutRequest, totalClients)
 	bar = pb.New(putTotal)
 
 	k, v := mustRandBytes(keySize), mustRandBytes(valSize)
@@ -68,40 +68,38 @@ func putFunc(cmd *cobra.Command, args []string) {
 
 	for i := range clients {
 		wg.Add(1)
-		go doPut(clients[i], requests)
+		go doPut(context.Background(), clients[i], requests)
 	}
 
-	start := time.Now()
-	for i := 0; i < putTotal; i++ {
-		r := &etcdserverpb.PutRequest{
-			Key:   k,
-			Value: v,
+	pdoneC := printReport(results)
+
+	go func() {
+		for i := 0; i < putTotal; i++ {
+			requests <- etcdserverpb.PutRequest{Key: k, Value: v}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 	wg.Wait()
 
 	bar.Finish()
-	printReport(putTotal, results, time.Now().Sub(start))
+
+	close(results)
+	<-pdoneC
 }
 
-func doPut(client etcdserverpb.KVClient, requests <-chan *etcdserverpb.PutRequest) {
+func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
 	defer wg.Done()
 
 	for r := range requests {
 		st := time.Now()
-		_, err := client.Put(context.Background(), r)
+		_, err := client.Put(ctx, &r)
 
 		var errStr string
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Now().Sub(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 	}
 }

+ 17 - 17
tools/benchmark/cmd/range.go

@@ -55,8 +55,8 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		end = []byte(args[1])
 	}
 
-	results = make(chan *result, rangeTotal)
-	requests := make(chan *etcdserverpb.RangeRequest, rangeTotal)
+	results = make(chan result)
+	requests := make(chan etcdserverpb.RangeRequest, totalClients)
 	bar = pb.New(rangeTotal)
 
 	conns := make([]*grpc.ClientConn, totalConns)
@@ -77,37 +77,37 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		go doRange(clients[i], requests)
 	}
 
-	start := time.Now()
-	for i := 0; i < rangeTotal; i++ {
-		r := &etcdserverpb.RangeRequest{
-			Key:      k,
-			RangeEnd: end,
+	pdoneC := printReport(results)
+
+	go func() {
+		for i := 0; i < rangeTotal; i++ {
+			requests <- etcdserverpb.RangeRequest{
+				Key:      k,
+				RangeEnd: end}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 	wg.Wait()
 
 	bar.Finish()
-	printReport(rangeTotal, results, time.Now().Sub(start))
+
+	close(results)
+	<-pdoneC
 }
 
-func doRange(client etcdserverpb.KVClient, requests <-chan *etcdserverpb.RangeRequest) {
+func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) {
 	defer wg.Done()
 
 	for req := range requests {
 		st := time.Now()
-		_, err := client.Range(context.Background(), req)
+		_, err := client.Range(context.Background(), &req)
 
 		var errStr string
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Now().Sub(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 	}
 }

+ 39 - 30
tools/benchmark/cmd/report.go

@@ -39,49 +39,58 @@ type report struct {
 	average  float64
 	rps      float64
 
-	results chan *result
+	results chan result
 	total   time.Duration
 
 	errorDist map[string]int
 	lats      []float64
 }
 
-func printReport(size int, results chan *result, total time.Duration) {
-	r := &report{
-		results:   results,
-		total:     total,
-		errorDist: make(map[string]int),
-	}
-	r.finalize()
-	r.print()
+func printReport(results chan result) <-chan struct{} {
+	return wrapReport(func() {
+		r := &report{
+			results:   results,
+			errorDist: make(map[string]int),
+		}
+		r.finalize()
+		r.print()
+	})
 }
 
-func printRate(size int, results chan *result, total time.Duration) {
-	r := &report{
-		results:   results,
-		total:     total,
-		errorDist: make(map[string]int),
-	}
-	r.finalize()
-	fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
+func printRate(results chan result) <-chan struct{} {
+	return wrapReport(func() {
+		r := &report{
+			results:   results,
+			errorDist: make(map[string]int),
+		}
+		r.finalize()
+		fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
+	})
+}
+
+func wrapReport(f func()) <-chan struct{} {
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		f()
+	}()
+	return donec
 }
 
 func (r *report) finalize() {
-	for {
-		select {
-		case res := <-r.results:
-			if res.errStr != "" {
-				r.errorDist[res.errStr]++
-			} else {
-				r.lats = append(r.lats, res.duration.Seconds())
-				r.avgTotal += res.duration.Seconds()
-			}
-		default:
-			r.rps = float64(len(r.lats)) / r.total.Seconds()
-			r.average = r.avgTotal / float64(len(r.lats))
-			return
+	st := time.Now()
+	for res := range r.results {
+		if res.errStr != "" {
+			r.errorDist[res.errStr]++
+		} else {
+			r.lats = append(r.lats, res.duration.Seconds())
+			r.avgTotal += res.duration.Seconds()
 		}
 	}
+	r.total = time.Since(st)
+
+	r.rps = float64(len(r.lats)) / r.total.Seconds()
+	r.average = r.avgTotal / float64(len(r.lats))
 }
 
 func (r *report) print() {

+ 1 - 1
tools/benchmark/cmd/root.go

@@ -39,7 +39,7 @@ var (
 	totalClients uint
 
 	bar     *pb.ProgressBar
-	results chan *result
+	results chan result
 	wg      sync.WaitGroup
 )
 

+ 38 - 40
tools/benchmark/cmd/watch.go

@@ -70,7 +70,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		watched[i] = mustRandBytes(32)
 	}
 
-	requests := make(chan *etcdserverpb.WatchRequest, watchTotal)
+	requests := make(chan etcdserverpb.WatchRequest, totalClients)
 
 	conns := make([]*grpc.ClientConn, totalConns)
 	for i := range conns {
@@ -98,90 +98,88 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	}
 
 	// watching phase
-	results = make(chan *result, watchTotal)
+	results = make(chan result)
 	bar = pb.New(watchTotal)
 
 	bar.Format("Bom !")
 	bar.Start()
 
-	start := time.Now()
-	for i := 0; i < watchTotal; i++ {
-		r := &etcdserverpb.WatchRequest{
-			Key: watched[i%(len(watched))],
+	pdoneC := printRate(results)
+
+	go func() {
+		for i := 0; i < watchTotal; i++ {
+			requests <- etcdserverpb.WatchRequest{
+				Key: watched[i%(len(watched))],
+			}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 	wg.Wait()
 	bar.Finish()
+
 	fmt.Printf("Watch creation summary:\n")
-	printRate(watchTotal, results, time.Now().Sub(start))
+	close(results)
+	<-pdoneC
 
 	// put phase
 	kv := etcdserverpb.NewKVClient(conns[0])
 	// total number of puts * number of watchers on each key
 	eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal)
 
-	results = make(chan *result, eventsTotal)
+	results = make(chan result)
 	bar = pb.New(eventsTotal)
 
 	bar.Format("Bom !")
 	bar.Start()
 
-	start = time.Now()
+	putreqc := make(chan etcdserverpb.PutRequest)
 
-	// TODO: create multiple clients to do put to increase throughput
-	// TODO: use a real rate-limiter instead of sleep.
 	for i := 0; i < watchPutTotal; i++ {
-		r := &etcdserverpb.PutRequest{
-			Key:   watched[i%(len(watched))],
-			Value: []byte("data"),
-		}
-		_, err := kv.Put(context.TODO(), r)
-		if err != nil {
-			fmt.Fprintln(os.Stderr, "Failed to put:", err)
-		}
-		time.Sleep(time.Second / time.Duration(watchPutRate))
+		wg.Add(1)
+		go doPut(context.TODO(), kv, putreqc)
 	}
 
-	for {
-		if len(results) == eventsTotal {
-			break
+	pdoneC = printRate(results)
+
+	go func() {
+		for i := 0; i < eventsTotal; i++ {
+			putreqc <- etcdserverpb.PutRequest{
+				Key:   watched[i%(len(watched))],
+				Value: []byte("data"),
+			}
+			// TODO: use a real rate-limiter instead of sleep.
+			time.Sleep(time.Second / time.Duration(watchPutRate))
 		}
-		time.Sleep(50 * time.Millisecond)
-	}
+		close(putreqc)
+	}()
 
+	wg.Wait()
 	bar.Finish()
 	fmt.Printf("Watch events received summary:\n")
-	printRate(eventsTotal, results, time.Now().Sub(start))
+	close(results)
+	<-pdoneC
 }
 
-func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan *etcdserverpb.WatchRequest) {
+func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) {
 	for r := range requests {
 		st := time.Now()
-		err := stream.Send(r)
+		err := stream.Send(&r)
 		var errStr string
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Since(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 	}
-	wg.Done()
-
 	for {
 		_, err := stream.Recv()
 		var errStr string
 		if err != nil {
 			errStr = err.Error()
 		}
-		results <- &result{
-			errStr: errStr,
-		}
+		results <- result{errStr: errStr}
 		bar.Increment()
 	}
+	wg.Done()
 }