瀏覽代碼

add header to kafka-console-producer

Chanwit Piromplad 6 年之前
父節點
當前提交
be7de1e156
共有 1 個文件被更改,包括 20 次插入0 次删除
  1. 20 0
      tools/kafka-console-producer/kafka-console-producer.go

+ 20 - 0
tools/kafka-console-producer/kafka-console-producer.go

@@ -15,6 +15,7 @@ import (
 
 
 var (
 var (
 	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
 	brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
+	headers       = flag.String("headers", "", "The headers of the message to produce. Example: -headers=foo:bar,bar:foo")
 	topic         = flag.String("topic", "", "REQUIRED: the topic to produce to")
 	topic         = flag.String("topic", "", "REQUIRED: the topic to produce to")
 	key           = flag.String("key", "", "The key of the message to produce. Can be empty.")
 	key           = flag.String("key", "", "The key of the message to produce. Can be empty.")
 	value         = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
 	value         = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
@@ -99,6 +100,25 @@ func main() {
 		printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
 		printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
 	}
 	}
 
 
+	if *headers != "" {
+		hdrs := []sarama.RecordHeader{}
+		arrHdrs := strings.Split(*headers, ",")
+		for _, h := range arrHdrs {
+			if header := strings.Split(h, ":"); len(header) != 2 {
+				printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo")
+			} else {
+				hdrs = append(hdrs, sarama.RecordHeader{
+					Key:   []byte(header[0]),
+					Value: []byte(header[1]),
+				})
+			}
+		}
+
+		if len(hdrs) != 0 {
+			message.Headers = hdrs
+		}
+	}
+
 	producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
 	producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
 	if err != nil {
 	if err != nil {
 		printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
 		printErrorAndExit(69, "Failed to open Kafka producer: %s", err)