Browse Source

Improve logging on producer performance tool

AJ Yoo 7 years ago
parent
commit
906c940fb1
1 changed files with 47 additions and 6 deletions
  1. 47 6
      tools/kafka-producer-performance/main.go

+ 47 - 6
tools/kafka-producer-performance/main.go

@@ -1,9 +1,11 @@
 package main
 package main
 
 
 import (
 import (
+	"context"
 	"crypto/rand"
 	"crypto/rand"
 	"flag"
 	"flag"
 	"fmt"
 	"fmt"
+	"io"
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"time"
 	"time"
@@ -215,7 +217,7 @@ func main() {
 	}
 	}
 
 
 	// Wait until all messages have been successfully sent (or an error occurs).
 	// Wait until all messages have been successfully sent (or an error occurs).
-	done := make(chan struct{})
+	messagesDone := make(chan struct{})
 	go func() {
 	go func() {
 		for i := 0; i < *messageLoad; i++ {
 		for i := 0; i < *messageLoad; i++ {
 			select {
 			select {
@@ -224,9 +226,25 @@ func main() {
 				printErrorAndExit(69, "%s", err)
 				printErrorAndExit(69, "%s", err)
 			}
 			}
 		}
 		}
-		done <- struct{}{}
+		messagesDone <- struct{}{}
 	}()
 	}()
 
 
+	// Print out metrics periodically.
+	metricsDone := make(chan struct{})
+	ctx, stopMetrics := context.WithCancel(context.Background())
+	go func(ctx context.Context) {
+		t := time.Tick(5 * time.Second)
+		for {
+			select {
+			case <-t:
+				printMetrics(os.Stdout, config.MetricRegistry)
+			case <-ctx.Done():
+				metricsDone <- struct{}{}
+				return
+			}
+		}
+	}(ctx)
+
 	// Produce messages at approximately -throughput messages per second.
 	// Produce messages at approximately -throughput messages per second.
 	if *throughput > 0 {
 	if *throughput > 0 {
 		ticker := time.NewTicker(time.Second)
 		ticker := time.NewTicker(time.Second)
@@ -243,11 +261,34 @@ func main() {
 		}
 		}
 	}
 	}
 
 
-	<-done
-	close(done)
+	<-messagesDone
+	close(messagesDone)
+	stopMetrics()
+	<-metricsDone
+	close(metricsDone)
 
 
-	// TODO: Decide on a better format (or add a flag for options).
-	metrics.WriteOnce(config.MetricRegistry, os.Stdout)
+	// Print final metrics.
+	printMetrics(os.Stdout, config.MetricRegistry)
+}
+
+func printMetrics(w io.Writer, r metrics.Registry) {
+	recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
+	requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
+	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
+	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
+		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
+		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
+		recordSendRate.Count(),
+		recordSendRate.RateMean(),
+		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
+		requestLatency.Mean(),
+		requestLatency.StdDev(),
+		requestLatencyPercentiles[0],
+		requestLatencyPercentiles[1],
+		requestLatencyPercentiles[2],
+		requestLatencyPercentiles[3],
+		requestLatencyPercentiles[4],
+	)
 }
 }
 
 
 func printUsageErrorAndExit(message string) {
 func printUsageErrorAndExit(message string) {