Sfoglia il codice sorgente

Merge pull request #1222 from TheSalmonTapes/master

Producer performance tool
Varun 7 anni fa
parent
commit
8e19158282

+ 1 - 0
tools/README.md

@@ -6,5 +6,6 @@ Some of these tools mirror tools that ship with Kafka, but these tools won't req
 - [kafka-console-producer](./kafka-console-producer): a command line tool to produce a single message to your Kafka custer.
 - [kafka-console-partitionconsumer](./kafka-console-partitionconsumer): (deprecated) a command line tool to consume a single partition of a topic on your Kafka cluster.
 - [kafka-console-consumer](./kafka-console-consumer): a command line tool to consume arbitrary partitions of a topic on your Kafka cluster.
+- [kafka-producer-performance](./kafka-producer-performance): a command line tool to performance test producers (sync and async) on your Kafka cluster.
 
 To install all tools, run `go get github.com/Shopify/sarama/tools/...`

+ 20 - 0
tools/kafka-producer-performance/README.md

@@ -0,0 +1,20 @@
+# kafka-producer-performance
+
+A command line tool to test producer performance.
+
+### Installation
+
+    go get github.com/Shopify/sarama/tools/kafka-producer-performance
+
+
+### Usage
+
+    # Display all command line options
+    kafka-producer-performance -help
+	
+	# Minimum invocation
+    kafka-producer-performance \
+		-brokers=kafka:9092 \
+		-message-load=50000 \
+		-message-size=100 \
+		-topic=producer_test

+ 400 - 0
tools/kafka-producer-performance/main.go

@@ -0,0 +1,400 @@
+package main
+
+import (
+	"context"
+	"crypto/rand"
+	"flag"
+	"fmt"
+	"io"
+	"os"
+	"strings"
+	gosync "sync"
+	"time"
+
+	"github.com/Shopify/sarama"
+	metrics "github.com/rcrowley/go-metrics"
+)
+
+var (
+	sync = flag.Bool(
+		"sync",
+		false,
+		"Use a synchronous producer.",
+	)
+	messageLoad = flag.Int(
+		"message-load",
+		0,
+		"REQUIRED: The number of messages to produce to -topic.",
+	)
+	messageSize = flag.Int(
+		"message-size",
+		0,
+		"REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
+	)
+	brokers = flag.String(
+		"brokers",
+		"",
+		"REQUIRED: A comma separated list of broker addresses.",
+	)
+	topic = flag.String(
+		"topic",
+		"",
+		"REQUIRED: The topic to run the performance test on.",
+	)
+	partition = flag.Int(
+		"partition",
+		-1,
+		"The partition of -topic to run the performance test on.",
+	)
+	throughput = flag.Int(
+		"throughput",
+		0,
+		"The maximum number of messages to send per second (0 for no limit).",
+	)
+	maxMessageBytes = flag.Int(
+		"max-message-bytes",
+		1000000,
+		"The max permitted size of a message.",
+	)
+	requiredAcks = flag.Int(
+		"required-acks",
+		1,
+		"The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
+	)
+	timeout = flag.Duration(
+		"timeout",
+		10*time.Second,
+		"The duration the producer will wait to receive -required-acks.",
+	)
+	partitioner = flag.String(
+		"partitioner",
+		"roundrobin",
+		"The partitioning scheme to use (hash, manual, random, roundrobin).",
+	)
+	compression = flag.String(
+		"compression",
+		"none",
+		"The compression method to use (none, gzip, snappy, lz4).",
+	)
+	flushFrequency = flag.Duration(
+		"flush-frequency",
+		0,
+		"The best-effort frequency of flushes.",
+	)
+	flushBytes = flag.Int(
+		"flush-bytes",
+		0,
+		"The best-effort number of bytes needed to trigger a flush.",
+	)
+	flushMessages = flag.Int(
+		"flush-messages",
+		0,
+		"The best-effort number of messages needed to trigger a flush.",
+	)
+	flushMaxMessages = flag.Int(
+		"flush-max-messages",
+		0,
+		"The maximum number of messages the producer will send in a single request.",
+	)
+	retryMax = flag.Int(
+		"retry-max",
+		3,
+		"The total number of times to retry sending a message.",
+	)
+	retryBackoff = flag.Duration(
+		"retry-backoff",
+		100*time.Millisecond,
+		"The duration the producer will wait for the cluster to settle between retries.",
+	)
+	clientID = flag.String(
+		"client-id",
+		"sarama",
+		"The client ID sent with every request to the brokers.",
+	)
+	channelBufferSize = flag.Int(
+		"channel-buffer-size",
+		256,
+		"The number of events to buffer in internal and external channels.",
+	)
+	routines = flag.Int(
+		"routines",
+		1,
+		"The number of routines to send the messages from (-sync only).",
+	)
+	version = flag.String(
+		"version",
+		"0.8.2.0",
+		"The assumed version of Kafka.",
+	)
+)
+
+func parseCompression(scheme string) sarama.CompressionCodec {
+	switch scheme {
+	case "none":
+		return sarama.CompressionNone
+	case "gzip":
+		return sarama.CompressionGZIP
+	case "snappy":
+		return sarama.CompressionSnappy
+	case "lz4":
+		return sarama.CompressionLZ4
+	default:
+		printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
+	}
+	panic("should not happen")
+}
+
+func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
+	if partition < 0 && scheme == "manual" {
+		printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
+	}
+	switch scheme {
+	case "manual":
+		return sarama.NewManualPartitioner
+	case "hash":
+		return sarama.NewHashPartitioner
+	case "random":
+		return sarama.NewRandomPartitioner
+	case "roundrobin":
+		return sarama.NewRoundRobinPartitioner
+	default:
+		printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
+	}
+	panic("should not happen")
+}
+
+func parseVersion(version string) sarama.KafkaVersion {
+	result, err := sarama.ParseKafkaVersion(version)
+	if err != nil {
+		printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
+	}
+	return result
+}
+
+func generateMessages(topic string, partition, messageLoad, messageSize int) []*sarama.ProducerMessage {
+	messages := make([]*sarama.ProducerMessage, messageLoad)
+	for i := 0; i < messageLoad; i++ {
+		payload := make([]byte, messageSize)
+		if _, err := rand.Read(payload); err != nil {
+			printErrorAndExit(69, "Failed to generate message payload: %s", err)
+		}
+		messages[i] = &sarama.ProducerMessage{
+			Topic:     topic,
+			Partition: int32(partition),
+			Value:     sarama.ByteEncoder(payload),
+		}
+	}
+	return messages
+}
+
+func main() {
+	flag.Parse()
+
+	if *brokers == "" {
+		printUsageErrorAndExit("-brokers is required")
+	}
+	if *topic == "" {
+		printUsageErrorAndExit("-topic is required")
+	}
+	if *messageLoad <= 0 {
+		printUsageErrorAndExit("-message-load must be greater than 0")
+	}
+	if *messageSize <= 0 {
+		printUsageErrorAndExit("-message-size must be greater than 0")
+	}
+	if *routines < 1 || *routines > *messageLoad {
+		printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
+	}
+
+	config := sarama.NewConfig()
+
+	config.Producer.MaxMessageBytes = *maxMessageBytes
+	config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
+	config.Producer.Timeout = *timeout
+	config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
+	config.Producer.Compression = parseCompression(*compression)
+	config.Producer.Flush.Frequency = *flushFrequency
+	config.Producer.Flush.Bytes = *flushBytes
+	config.Producer.Flush.Messages = *flushMessages
+	config.Producer.Flush.MaxMessages = *flushMaxMessages
+	config.Producer.Return.Successes = true
+	config.ClientID = *clientID
+	config.ChannelBufferSize = *channelBufferSize
+	config.Version = parseVersion(*version)
+
+	if err := config.Validate(); err != nil {
+		printErrorAndExit(69, "Invalid configuration: %s", err)
+	}
+
+	// Print out metrics periodically.
+	done := make(chan struct{})
+	ctx, cancel := context.WithCancel(context.Background())
+	go func(ctx context.Context) {
+		defer close(done)
+		t := time.Tick(5 * time.Second)
+		for {
+			select {
+			case <-t:
+				printMetrics(os.Stdout, config.MetricRegistry)
+			case <-ctx.Done():
+				return
+			}
+		}
+	}(ctx)
+
+	brokers := strings.Split(*brokers, ",")
+	if *sync {
+		runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
+			config, brokers, *throughput)
+	} else {
+		runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
+			config, brokers, *throughput)
+	}
+
+	cancel()
+	<-done
+
+	// Print final metrics.
+	printMetrics(os.Stdout, config.MetricRegistry)
+}
+
+func runAsyncProducer(topic string, partition, messageLoad, messageSize int,
+	config *sarama.Config, brokers []string, throughput int) {
+	producer, err := sarama.NewAsyncProducer(brokers, config)
+	if err != nil {
+		printErrorAndExit(69, "Failed to create producer: %s", err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			printErrorAndExit(69, "Failed to close producer: %s", err)
+		}
+	}()
+
+	messages := generateMessages(topic, partition, messageLoad, messageSize)
+
+	messagesDone := make(chan struct{})
+	go func() {
+		for i := 0; i < messageLoad; i++ {
+			select {
+			case <-producer.Successes():
+			case err = <-producer.Errors():
+				printErrorAndExit(69, "%s", err)
+			}
+		}
+		messagesDone <- struct{}{}
+	}()
+
+	if throughput > 0 {
+		ticker := time.NewTicker(time.Second)
+		for _, message := range messages {
+			for i := 0; i < throughput; i++ {
+				producer.Input() <- message
+			}
+			<-ticker.C
+		}
+		ticker.Stop()
+	} else {
+		for _, message := range messages {
+			producer.Input() <- message
+		}
+	}
+
+	<-messagesDone
+	close(messagesDone)
+}
+
+func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
+	config *sarama.Config, brokers []string, throughput int) {
+	producer, err := sarama.NewSyncProducer(brokers, config)
+	if err != nil {
+		printErrorAndExit(69, "Failed to create producer: %s", err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			printErrorAndExit(69, "Failed to close producer: %s", err)
+		}
+	}()
+
+	messages := make([][]*sarama.ProducerMessage, routines)
+	for i := 0; i < routines; i++ {
+		if i == routines-1 {
+			messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
+		} else {
+			messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
+		}
+	}
+
+	var wg gosync.WaitGroup
+	if throughput > 0 {
+		for _, messages := range messages {
+			messages := messages
+			wg.Add(1)
+			go func() {
+				ticker := time.NewTicker(time.Second)
+				for _, message := range messages {
+					for i := 0; i < throughput; i++ {
+						_, _, err = producer.SendMessage(message)
+						if err != nil {
+							printErrorAndExit(69, "Failed to send message: %s", err)
+						}
+					}
+					<-ticker.C
+				}
+				ticker.Stop()
+				wg.Done()
+			}()
+		}
+	} else {
+		for _, messages := range messages {
+			messages := messages
+			wg.Add(1)
+			go func() {
+				for _, message := range messages {
+					_, _, err = producer.SendMessage(message)
+					if err != nil {
+						printErrorAndExit(69, "Failed to send message: %s", err)
+					}
+				}
+				wg.Done()
+			}()
+		}
+	}
+	wg.Wait()
+}
+
+func printMetrics(w io.Writer, r metrics.Registry) {
+	if r.Get("record-send-rate") == nil || r.Get("request-latency-in-ms") == nil {
+		return
+	}
+	recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
+	requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
+	requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
+	fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
+		"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
+		"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
+		recordSendRate.Count(),
+		recordSendRate.RateMean(),
+		recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
+		requestLatency.Mean(),
+		requestLatency.StdDev(),
+		requestLatencyPercentiles[0],
+		requestLatencyPercentiles[1],
+		requestLatencyPercentiles[2],
+		requestLatencyPercentiles[3],
+		requestLatencyPercentiles[4],
+	)
+}
+
+func printUsageErrorAndExit(message string) {
+	fmt.Fprintln(os.Stderr, "ERROR:", message)
+	fmt.Fprintln(os.Stderr)
+	fmt.Fprintln(os.Stderr, "Available command line options:")
+	flag.PrintDefaults()
+	os.Exit(64)
+}
+
+func printErrorAndExit(code int, format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	os.Exit(code)
+}