Browse Source

Add routines for sync producer on performance tool

AJ Yoo 7 years ago
parent
commit
11e844d8ff
1 changed files with 75 additions and 35 deletions
  1. 75 35
      tools/kafka-producer-performance/main.go

+ 75 - 35
tools/kafka-producer-performance/main.go

@@ -8,6 +8,7 @@ import (
 	"io"
 	"os"
 	"strings"
+	gosync "sync"
 	"time"
 
 	"github.com/Shopify/sarama"
@@ -115,6 +116,11 @@ var (
 		256,
 		"The number of events to buffer in internal and external channels.",
 	)
+	routines = flag.Int(
+		"routines",
+		1,
+		"The number of routines to send the messages from (-sync only).",
+	)
 	version = flag.String(
 		"version",
 		"0.8.2.0",
@@ -165,6 +171,22 @@ func parseVersion(version string) sarama.KafkaVersion {
 	return result
 }
 
+func generateMessages(topic string, partition, messageLoad, messageSize int) []*sarama.ProducerMessage {
+	messages := make([]*sarama.ProducerMessage, messageLoad)
+	for i := 0; i < messageLoad; i++ {
+		payload := make([]byte, messageSize)
+		if _, err := rand.Read(payload); err != nil {
+			printErrorAndExit(69, "Failed to generate message payload: %s", err)
+		}
+		messages[i] = &sarama.ProducerMessage{
+			Topic:     topic,
+			Partition: int32(partition),
+			Value:     sarama.ByteEncoder(payload),
+		}
+	}
+	return messages
+}
+
 func main() {
 	flag.Parse()
 
@@ -180,6 +202,9 @@ func main() {
 	if *messageSize <= 0 {
 		printUsageErrorAndExit("-message-size must be greater than 0")
 	}
+	if *routines < 1 || *routines > *messageLoad {
+		printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
+	}
 
 	config := sarama.NewConfig()
 
@@ -201,20 +226,6 @@ func main() {
 		printErrorAndExit(69, "Invalid configuration: %s", err)
 	}
 
-	// Construct -messageLoad messages of appoximately -messageSize random bytes.
-	messages := make([]*sarama.ProducerMessage, *messageLoad)
-	for i := 0; i < *messageLoad; i++ {
-		payload := make([]byte, *messageSize)
-		if _, err := rand.Read(payload); err != nil {
-			printErrorAndExit(69, "Failed to generate message payload: %s", err)
-		}
-		messages[i] = &sarama.ProducerMessage{
-			Topic:     *topic,
-			Partition: int32(*partition),
-			Value:     sarama.ByteEncoder(payload),
-		}
-	}
-
 	// Print out metrics periodically.
 	ctx, cancel := context.WithCancel(context.Background())
 	go func(ctx context.Context) {
@@ -231,9 +242,11 @@ func main() {
 
 	brokers := strings.Split(*brokers, ",")
 	if *sync {
-		runSyncProducer(config, brokers, messages, *throughput)
+		runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
+			config, brokers, *throughput)
 	} else {
-		runAsyncProducer(config, brokers, messages, *throughput)
+		runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
+			config, brokers, *throughput)
 	}
 
 	cancel()
@@ -243,8 +256,8 @@ func main() {
 	printMetrics(os.Stdout, config.MetricRegistry)
 }
 
-func runAsyncProducer(config *sarama.Config, brokers []string,
-	messages []*sarama.ProducerMessage, throughput int) {
+func runAsyncProducer(topic string, partition, messageLoad, messageSize int,
+	config *sarama.Config, brokers []string, throughput int) {
 	producer, err := sarama.NewAsyncProducer(brokers, config)
 	if err != nil {
 		printErrorAndExit(69, "Failed to create producer: %s", err)
@@ -255,9 +268,11 @@ func runAsyncProducer(config *sarama.Config, brokers []string,
 		}
 	}()
 
+	messages := generateMessages(topic, partition, messageLoad, messageSize)
+
 	messagesDone := make(chan struct{})
 	go func() {
-		for i := 0; i < *messageLoad; i++ {
+		for i := 0; i < messageLoad; i++ {
 			select {
 			case <-producer.Successes():
 			case err = <-producer.Errors():
@@ -286,8 +301,8 @@ func runAsyncProducer(config *sarama.Config, brokers []string,
 	close(messagesDone)
 }
 
-func runSyncProducer(config *sarama.Config, brokers []string,
-	messages []*sarama.ProducerMessage, throughput int) {
+func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
+	config *sarama.Config, brokers []string, throughput int) {
 	producer, err := sarama.NewSyncProducer(brokers, config)
 	if err != nil {
 		printErrorAndExit(69, "Failed to create producer: %s", err)
@@ -298,26 +313,51 @@ func runSyncProducer(config *sarama.Config, brokers []string,
 		}
 	}()
 
+	messages := make([][]*sarama.ProducerMessage, routines)
+	for i := 0; i < routines; i++ {
+		if i == routines-1 {
+			messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
+		} else {
+			messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
+		}
+	}
+
+	var wg gosync.WaitGroup
 	if throughput > 0 {
-		ticker := time.NewTicker(time.Second)
-		for _, message := range messages {
-			for i := 0; i < throughput; i++ {
-				_, _, err = producer.SendMessage(message)
-				if err != nil {
-					printErrorAndExit(69, "Failed to send message: %s", err)
+		for _, messages := range messages {
+			messages := messages
+			wg.Add(1)
+			go func() {
+				ticker := time.NewTicker(time.Second)
+				for _, message := range messages {
+					for i := 0; i < throughput; i++ {
+						_, _, err = producer.SendMessage(message)
+						if err != nil {
+							printErrorAndExit(69, "Failed to send message: %s", err)
+						}
+					}
+					<-ticker.C
 				}
-			}
-			<-ticker.C
+				ticker.Stop()
+				wg.Done()
+			}()
 		}
-		ticker.Stop()
 	} else {
-		for _, message := range messages {
-			_, _, err = producer.SendMessage(message)
-			if err != nil {
-				printErrorAndExit(69, "Failed to send message: %s", err)
-			}
+		for _, messages := range messages {
+			messages := messages
+			wg.Add(1)
+			go func() {
+				for _, message := range messages {
+					_, _, err = producer.SendMessage(message)
+					if err != nil {
+						printErrorAndExit(69, "Failed to send message: %s", err)
+					}
+				}
+				wg.Done()
+			}()
 		}
 	}
+	wg.Wait()
 }
 
 func printMetrics(w io.Writer, r metrics.Registry) {