Browse Source

Add a kaka-console-producer application.

Willem van Bergen 10 năm trước cách đây
mục cha
commit
f4f1c42e84

+ 2 - 0
tools/kafka-console-producer/.gitignore

@@ -0,0 +1,2 @@
+kafka-console-producer
+kafka-console-producer.test

+ 34 - 0
tools/kafka-console-producer/README.md

@@ -0,0 +1,34 @@
+# kafka-console-producer
+
+A simple command line tool to produce a single message to Kafka.
+
+### Installation
+
+    go install github.com/Shopify/sarama/tools/kafka-console-producer
+
+
+### Usage
+
+    # Minimum invocation
+    kafka-console-producer -topic=test -value=value -brokers=kafka1:9092
+
+    # It will pick up a KAFKA_PEERS environment variable
+    export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
+    kafka-console-producer -topic=test -value=value
+
+    # It will read the value from stdin by using pipes
+    echo "hello world" | kafka-console-producer -topic=test
+
+    # Specify a key:
+    echo "hello world" | kafka-console-producer -topic=test -key=key
+
+    # Partitioning: by default, kafka-console-producer will partition as follows:
+    # - manual partitioning if a -partition is provided
+    # - hash partitioning by key if a -key is provided
+    # - random partioning otherwise.
+    #
+    # You can override this using the -partitioner argument:
+    echo "hello world" | kafka-console-producer -topic=test -key=key -partitioner=random
+
+    # Display all command line options
+    kafka-console-producer -help

+ 118 - 0
tools/kafka-console-producer/kafka-console-producer.go

@@ -0,0 +1,118 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"os"
+	"strings"
+
+	"github.com/Shopify/sarama"
+)
+
+var (
+	brokerList  = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
+	topic       = flag.String("topic", "", "REQUIRED: the topic to produce to")
+	key         = flag.String("key", "", "The key of the message to produce. Can be empty.")
+	value       = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
+	partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
+	partition   = flag.Int("partition", -1, "The partition to produce to.")
+	verbose     = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
+	silent      = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
+
+	logger = log.New(os.Stderr, "", log.LstdFlags)
+)
+
+func main() {
+	flag.Parse()
+
+	if *brokerList == "" {
+		printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
+	}
+
+	if *topic == "" {
+		printUsageErrorAndExit("no -topic specified")
+	}
+
+	if *verbose {
+		sarama.Logger = logger
+	}
+
+	config := sarama.NewConfig()
+	config.Producer.RequiredAcks = sarama.WaitForAll
+
+	switch *partitioner {
+	case "":
+		if *partition >= 0 {
+			config.Producer.Partitioner = sarama.NewManualPartitioner
+		} else {
+			config.Producer.Partitioner = sarama.NewHashPartitioner
+		}
+	case "hash":
+		config.Producer.Partitioner = sarama.NewHashPartitioner
+	case "random":
+		config.Producer.Partitioner = sarama.NewRandomPartitioner
+	case "manual":
+		config.Producer.Partitioner = sarama.NewManualPartitioner
+		if *partition == -1 {
+			printUsageErrorAndExit("-partition is required when partitioning manually")
+		}
+	default:
+		printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
+	}
+
+	message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
+
+	if *key != "" {
+		message.Key = sarama.StringEncoder(*key)
+	}
+
+	if *value != "" {
+		message.Value = sarama.StringEncoder(*value)
+	} else if stdinAvailable() {
+		bytes, err := ioutil.ReadAll(os.Stdin)
+		if err != nil {
+			printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
+		}
+		message.Value = sarama.ByteEncoder(bytes)
+	} else {
+		printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
+	}
+
+	producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
+	if err != nil {
+		printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			logger.Println("Failed to close Kafka producer cleanly:", err)
+		}
+	}()
+
+	partition, offset, err := producer.SendMessage(message)
+	if err != nil {
+		printErrorAndExit(69, "Failed to produce message: %s", err)
+	} else if !*silent {
+		fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
+	}
+}
+
+func printErrorAndExit(code int, format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	os.Exit(code)
+}
+
+func printUsageErrorAndExit(message string) {
+	fmt.Fprintln(os.Stderr, "ERROR:", message)
+	fmt.Fprintln(os.Stderr)
+	fmt.Fprintln(os.Stderr, "Available command line options:")
+	flag.PrintDefaults()
+	os.Exit(64)
+}
+
+func stdinAvailable() bool {
+	stat, _ := os.Stdin.Stat()
+	return (stat.Mode() & os.ModeCharDevice) == 0
+}