소스 검색

Add support for sync producer on performance tool

AJ Yoo 7 년 전
부모
커밋
98d852336a
1개의 변경된 파일80개의 추가작업 그리고 33개의 파일을 삭제
  1. 80 33
      tools/kafka-producer-performance/main.go

+ 80 - 33
tools/kafka-producer-performance/main.go

@@ -15,6 +15,11 @@ import (
 )
 
 var (
+	sync = flag.Bool(
+		"sync",
+		false,
+		"Use a synchronous producer.",
+	)
 	messageLoad = flag.Int(
 		"message-load",
 		0,
@@ -196,22 +201,11 @@ func main() {
 		printErrorAndExit(69, "Invalid configuration: %s", err)
 	}
 
-	// The async producer provides maximum performance tuning control.
-	producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
-	if err != nil {
-		printErrorAndExit(69, "Failed to create producer: %s", err)
-	}
-	defer func() {
-		if err := producer.Close(); err != nil {
-			printErrorAndExit(69, "Failed to close producer: %s", err)
-		}
-	}()
-
 	// Construct -messageLoad messages of appoximately -messageSize random bytes.
 	messages := make([]*sarama.ProducerMessage, *messageLoad)
 	for i := 0; i < *messageLoad; i++ {
 		payload := make([]byte, *messageSize)
-		if _, err = rand.Read(payload); err != nil {
+		if _, err := rand.Read(payload); err != nil {
 			printErrorAndExit(69, "Failed to generate message payload: %s", err)
 		}
 		messages[i] = &sarama.ProducerMessage{
@@ -221,19 +215,6 @@ func main() {
 		}
 	}
 
-	// Wait until all messages have been successfully sent (or an error occurs).
-	messagesDone := make(chan struct{})
-	go func() {
-		for i := 0; i < *messageLoad; i++ {
-			select {
-			case <-producer.Successes():
-			case err = <-producer.Errors():
-				printErrorAndExit(69, "%s", err)
-			}
-		}
-		messagesDone <- struct{}{}
-	}()
-
 	// Print out metrics periodically.
 	metricsDone := make(chan struct{})
 	ctx, stopMetrics := context.WithCancel(context.Background())
@@ -250,11 +231,49 @@ func main() {
 		}
 	}(ctx)
 
-	// Produce messages at approximately -throughput messages per second.
-	if *throughput > 0 {
+	brokers := strings.Split(*brokers, ",")
+	if *sync {
+		runSyncProducer(config, brokers, messages, *throughput)
+	} else {
+		runAsyncProducer(config, brokers, messages, *throughput)
+	}
+
+	stopMetrics()
+	<-metricsDone
+	close(metricsDone)
+
+	// Print final metrics.
+	printMetrics(os.Stdout, config.MetricRegistry)
+}
+
+func runAsyncProducer(config *sarama.Config, brokers []string,
+	messages []*sarama.ProducerMessage, throughput int) {
+	producer, err := sarama.NewAsyncProducer(brokers, config)
+	if err != nil {
+		printErrorAndExit(69, "Failed to create producer: %s", err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			printErrorAndExit(69, "Failed to close producer: %s", err)
+		}
+	}()
+
+	messagesDone := make(chan struct{})
+	go func() {
+		for i := 0; i < *messageLoad; i++ {
+			select {
+			case <-producer.Successes():
+			case err = <-producer.Errors():
+				printErrorAndExit(69, "%s", err)
+			}
+		}
+		messagesDone <- struct{}{}
+	}()
+
+	if throughput > 0 {
 		ticker := time.NewTicker(time.Second)
 		for _, message := range messages {
-			for i := 0; i < *throughput; i++ {
+			for i := 0; i < throughput; i++ {
 				producer.Input() <- message
 			}
 			<-ticker.C
@@ -268,12 +287,40 @@ func main() {
 
 	<-messagesDone
 	close(messagesDone)
-	stopMetrics()
-	<-metricsDone
-	close(metricsDone)
+}
 
-	// Print final metrics.
-	printMetrics(os.Stdout, config.MetricRegistry)
+func runSyncProducer(config *sarama.Config, brokers []string,
+	messages []*sarama.ProducerMessage, throughput int) {
+	producer, err := sarama.NewSyncProducer(brokers, config)
+	if err != nil {
+		printErrorAndExit(69, "Failed to create producer: %s", err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			printErrorAndExit(69, "Failed to close producer: %s", err)
+		}
+	}()
+
+	if throughput > 0 {
+		ticker := time.NewTicker(time.Second)
+		for _, message := range messages {
+			for i := 0; i < throughput; i++ {
+				_, _, err = producer.SendMessage(message)
+				if err != nil {
+					printErrorAndExit(69, "Failed to send message: %s", err)
+				}
+			}
+			<-ticker.C
+		}
+		ticker.Stop()
+	} else {
+		for _, message := range messages {
+			_, _, err = producer.SendMessage(message)
+			if err != nil {
+				printErrorAndExit(69, "Failed to send message: %s", err)
+			}
+		}
+	}
 }
 
 func printMetrics(w io.Writer, r metrics.Registry) {