Browse Source

tools/benchmark: stream results into reports

Reports depended on writing all results to a large buffered channel and
reading from that synchronously. Similarly, requests were buffered the
same way which can take significant memory on big request strings. Instead,
have reports stream in results as they're produced then print when the
results channel closes.
Anthony Romano 10 years ago
parent
commit
382103af60

+ 16 - 18
tools/benchmark/cmd/put.go

@@ -47,8 +47,8 @@ func init() {
 }
 }
 
 
 func putFunc(cmd *cobra.Command, args []string) {
 func putFunc(cmd *cobra.Command, args []string) {
-	results = make(chan *result, putTotal)
-	requests := make(chan *etcdserverpb.PutRequest, putTotal)
+	results = make(chan result)
+	requests := make(chan etcdserverpb.PutRequest, totalClients)
 	bar = pb.New(putTotal)
 	bar = pb.New(putTotal)
 
 
 	k, v := mustRandBytes(keySize), mustRandBytes(valSize)
 	k, v := mustRandBytes(keySize), mustRandBytes(valSize)
@@ -68,40 +68,38 @@ func putFunc(cmd *cobra.Command, args []string) {
 
 
 	for i := range clients {
 	for i := range clients {
 		wg.Add(1)
 		wg.Add(1)
-		go doPut(clients[i], requests)
+		go doPut(context.Background(), clients[i], requests)
 	}
 	}
 
 
-	start := time.Now()
-	for i := 0; i < putTotal; i++ {
-		r := &etcdserverpb.PutRequest{
-			Key:   k,
-			Value: v,
+	pdoneC := printReport(results)
+
+	go func() {
+		for i := 0; i < putTotal; i++ {
+			requests <- etcdserverpb.PutRequest{Key: k, Value: v}
 		}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 
 	wg.Wait()
 	wg.Wait()
 
 
 	bar.Finish()
 	bar.Finish()
-	printReport(putTotal, results, time.Now().Sub(start))
+
+	close(results)
+	<-pdoneC
 }
 }
 
 
-func doPut(client etcdserverpb.KVClient, requests <-chan *etcdserverpb.PutRequest) {
+func doPut(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
 	defer wg.Done()
 	defer wg.Done()
 
 
 	for r := range requests {
 	for r := range requests {
 		st := time.Now()
 		st := time.Now()
-		_, err := client.Put(context.Background(), r)
+		_, err := client.Put(ctx, &r)
 
 
 		var errStr string
 		var errStr string
 		if err != nil {
 		if err != nil {
 			errStr = err.Error()
 			errStr = err.Error()
 		}
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Now().Sub(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 		bar.Increment()
 	}
 	}
 }
 }

+ 17 - 17
tools/benchmark/cmd/range.go

@@ -55,8 +55,8 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		end = []byte(args[1])
 		end = []byte(args[1])
 	}
 	}
 
 
-	results = make(chan *result, rangeTotal)
-	requests := make(chan *etcdserverpb.RangeRequest, rangeTotal)
+	results = make(chan result)
+	requests := make(chan etcdserverpb.RangeRequest, totalClients)
 	bar = pb.New(rangeTotal)
 	bar = pb.New(rangeTotal)
 
 
 	conns := make([]*grpc.ClientConn, totalConns)
 	conns := make([]*grpc.ClientConn, totalConns)
@@ -77,37 +77,37 @@ func rangeFunc(cmd *cobra.Command, args []string) {
 		go doRange(clients[i], requests)
 		go doRange(clients[i], requests)
 	}
 	}
 
 
-	start := time.Now()
-	for i := 0; i < rangeTotal; i++ {
-		r := &etcdserverpb.RangeRequest{
-			Key:      k,
-			RangeEnd: end,
+	pdoneC := printReport(results)
+
+	go func() {
+		for i := 0; i < rangeTotal; i++ {
+			requests <- etcdserverpb.RangeRequest{
+				Key:      k,
+				RangeEnd: end}
 		}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 
 	wg.Wait()
 	wg.Wait()
 
 
 	bar.Finish()
 	bar.Finish()
-	printReport(rangeTotal, results, time.Now().Sub(start))
+
+	close(results)
+	<-pdoneC
 }
 }
 
 
-func doRange(client etcdserverpb.KVClient, requests <-chan *etcdserverpb.RangeRequest) {
+func doRange(client etcdserverpb.KVClient, requests <-chan etcdserverpb.RangeRequest) {
 	defer wg.Done()
 	defer wg.Done()
 
 
 	for req := range requests {
 	for req := range requests {
 		st := time.Now()
 		st := time.Now()
-		_, err := client.Range(context.Background(), req)
+		_, err := client.Range(context.Background(), &req)
 
 
 		var errStr string
 		var errStr string
 		if err != nil {
 		if err != nil {
 			errStr = err.Error()
 			errStr = err.Error()
 		}
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Now().Sub(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 		bar.Increment()
 	}
 	}
 }
 }

+ 39 - 30
tools/benchmark/cmd/report.go

@@ -39,49 +39,58 @@ type report struct {
 	average  float64
 	average  float64
 	rps      float64
 	rps      float64
 
 
-	results chan *result
+	results chan result
 	total   time.Duration
 	total   time.Duration
 
 
 	errorDist map[string]int
 	errorDist map[string]int
 	lats      []float64
 	lats      []float64
 }
 }
 
 
-func printReport(size int, results chan *result, total time.Duration) {
-	r := &report{
-		results:   results,
-		total:     total,
-		errorDist: make(map[string]int),
-	}
-	r.finalize()
-	r.print()
+func printReport(results chan result) <-chan struct{} {
+	return wrapReport(func() {
+		r := &report{
+			results:   results,
+			errorDist: make(map[string]int),
+		}
+		r.finalize()
+		r.print()
+	})
 }
 }
 
 
-func printRate(size int, results chan *result, total time.Duration) {
-	r := &report{
-		results:   results,
-		total:     total,
-		errorDist: make(map[string]int),
-	}
-	r.finalize()
-	fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
+func printRate(results chan result) <-chan struct{} {
+	return wrapReport(func() {
+		r := &report{
+			results:   results,
+			errorDist: make(map[string]int),
+		}
+		r.finalize()
+		fmt.Printf(" Requests/sec:\t%4.4f\n", r.rps)
+	})
+}
+
+func wrapReport(f func()) <-chan struct{} {
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		f()
+	}()
+	return donec
 }
 }
 
 
 func (r *report) finalize() {
 func (r *report) finalize() {
-	for {
-		select {
-		case res := <-r.results:
-			if res.errStr != "" {
-				r.errorDist[res.errStr]++
-			} else {
-				r.lats = append(r.lats, res.duration.Seconds())
-				r.avgTotal += res.duration.Seconds()
-			}
-		default:
-			r.rps = float64(len(r.lats)) / r.total.Seconds()
-			r.average = r.avgTotal / float64(len(r.lats))
-			return
+	st := time.Now()
+	for res := range r.results {
+		if res.errStr != "" {
+			r.errorDist[res.errStr]++
+		} else {
+			r.lats = append(r.lats, res.duration.Seconds())
+			r.avgTotal += res.duration.Seconds()
 		}
 		}
 	}
 	}
+	r.total = time.Since(st)
+
+	r.rps = float64(len(r.lats)) / r.total.Seconds()
+	r.average = r.avgTotal / float64(len(r.lats))
 }
 }
 
 
 func (r *report) print() {
 func (r *report) print() {

+ 1 - 1
tools/benchmark/cmd/root.go

@@ -39,7 +39,7 @@ var (
 	totalClients uint
 	totalClients uint
 
 
 	bar     *pb.ProgressBar
 	bar     *pb.ProgressBar
-	results chan *result
+	results chan result
 	wg      sync.WaitGroup
 	wg      sync.WaitGroup
 )
 )
 
 

+ 38 - 40
tools/benchmark/cmd/watch.go

@@ -70,7 +70,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		watched[i] = mustRandBytes(32)
 		watched[i] = mustRandBytes(32)
 	}
 	}
 
 
-	requests := make(chan *etcdserverpb.WatchRequest, watchTotal)
+	requests := make(chan etcdserverpb.WatchRequest, totalClients)
 
 
 	conns := make([]*grpc.ClientConn, totalConns)
 	conns := make([]*grpc.ClientConn, totalConns)
 	for i := range conns {
 	for i := range conns {
@@ -98,90 +98,88 @@ func watchFunc(cmd *cobra.Command, args []string) {
 	}
 	}
 
 
 	// watching phase
 	// watching phase
-	results = make(chan *result, watchTotal)
+	results = make(chan result)
 	bar = pb.New(watchTotal)
 	bar = pb.New(watchTotal)
 
 
 	bar.Format("Bom !")
 	bar.Format("Bom !")
 	bar.Start()
 	bar.Start()
 
 
-	start := time.Now()
-	for i := 0; i < watchTotal; i++ {
-		r := &etcdserverpb.WatchRequest{
-			Key: watched[i%(len(watched))],
+	pdoneC := printRate(results)
+
+	go func() {
+		for i := 0; i < watchTotal; i++ {
+			requests <- etcdserverpb.WatchRequest{
+				Key: watched[i%(len(watched))],
+			}
 		}
 		}
-		requests <- r
-	}
-	close(requests)
+		close(requests)
+	}()
 
 
 	wg.Wait()
 	wg.Wait()
 	bar.Finish()
 	bar.Finish()
+
 	fmt.Printf("Watch creation summary:\n")
 	fmt.Printf("Watch creation summary:\n")
-	printRate(watchTotal, results, time.Now().Sub(start))
+	close(results)
+	<-pdoneC
 
 
 	// put phase
 	// put phase
 	kv := etcdserverpb.NewKVClient(conns[0])
 	kv := etcdserverpb.NewKVClient(conns[0])
 	// total number of puts * number of watchers on each key
 	// total number of puts * number of watchers on each key
 	eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal)
 	eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal)
 
 
-	results = make(chan *result, eventsTotal)
+	results = make(chan result)
 	bar = pb.New(eventsTotal)
 	bar = pb.New(eventsTotal)
 
 
 	bar.Format("Bom !")
 	bar.Format("Bom !")
 	bar.Start()
 	bar.Start()
 
 
-	start = time.Now()
+	putreqc := make(chan etcdserverpb.PutRequest)
 
 
-	// TODO: create multiple clients to do put to increase throughput
-	// TODO: use a real rate-limiter instead of sleep.
 	for i := 0; i < watchPutTotal; i++ {
 	for i := 0; i < watchPutTotal; i++ {
-		r := &etcdserverpb.PutRequest{
-			Key:   watched[i%(len(watched))],
-			Value: []byte("data"),
-		}
-		_, err := kv.Put(context.TODO(), r)
-		if err != nil {
-			fmt.Fprintln(os.Stderr, "Failed to put:", err)
-		}
-		time.Sleep(time.Second / time.Duration(watchPutRate))
+		wg.Add(1)
+		go doPut(context.TODO(), kv, putreqc)
 	}
 	}
 
 
-	for {
-		if len(results) == eventsTotal {
-			break
+	pdoneC = printRate(results)
+
+	go func() {
+		for i := 0; i < eventsTotal; i++ {
+			putreqc <- etcdserverpb.PutRequest{
+				Key:   watched[i%(len(watched))],
+				Value: []byte("data"),
+			}
+			// TODO: use a real rate-limiter instead of sleep.
+			time.Sleep(time.Second / time.Duration(watchPutRate))
 		}
 		}
-		time.Sleep(50 * time.Millisecond)
-	}
+		close(putreqc)
+	}()
 
 
+	wg.Wait()
 	bar.Finish()
 	bar.Finish()
 	fmt.Printf("Watch events received summary:\n")
 	fmt.Printf("Watch events received summary:\n")
-	printRate(eventsTotal, results, time.Now().Sub(start))
+	close(results)
+	<-pdoneC
 }
 }
 
 
-func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan *etcdserverpb.WatchRequest) {
+func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb.WatchRequest) {
 	for r := range requests {
 	for r := range requests {
 		st := time.Now()
 		st := time.Now()
-		err := stream.Send(r)
+		err := stream.Send(&r)
 		var errStr string
 		var errStr string
 		if err != nil {
 		if err != nil {
 			errStr = err.Error()
 			errStr = err.Error()
 		}
 		}
-		results <- &result{
-			errStr:   errStr,
-			duration: time.Since(st),
-		}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 		bar.Increment()
 	}
 	}
-	wg.Done()
-
 	for {
 	for {
 		_, err := stream.Recv()
 		_, err := stream.Recv()
 		var errStr string
 		var errStr string
 		if err != nil {
 		if err != nil {
 			errStr = err.Error()
 			errStr = err.Error()
 		}
 		}
-		results <- &result{
-			errStr: errStr,
-		}
+		results <- result{errStr: errStr}
 		bar.Increment()
 		bar.Increment()
 	}
 	}
+	wg.Done()
 }
 }