Kaynağa Gözat

Add kafka-console-partitionconsumer tool.

Willem van Bergen 10 yıl önce
ebeveyn
işleme
29c42efc69

+ 2 - 0
tools/kafka-console-partitionconsumer/.gitignore

@@ -0,0 +1,2 @@
+kafka-console-partitionconsumer
+kafka-console-partitionconsumer.test

+ 25 - 0
tools/kafka-console-partitionconsumer/README.md

@@ -0,0 +1,25 @@
+# kafka-console-partitionconsumer
+
+A simple command line tool to consume a partition of a topic and print the messages
+on the standard output.
+
+### Installation
+
+    go install github.com/Shopify/sarama/tools/kafka-console-partitionconsumer
+
+### Usage
+
+    # Minimum invocation
+    kafka-console-partitionconsumer -topic=test -partition=4 -brokers=kafka1:9092
+
+    # It will pick up a KAFKA_PEERS environment variable
+    export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
+    kafka-console-producer -topic=test -partition=4
+
+    # You can specify the offset you want to start at. It can be either
+    # `oldest`, `newest`, or a specific offset number
+    kafka-console-producer -topic=test -partition=3 -offset=oldest
+    kafka-console-producer -topic=test -partition=2 -offset=1337
+
+    # Display all command line options
+    kafka-console-producer -help

+ 102 - 0
tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go

@@ -0,0 +1,102 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"os/signal"
+	"strconv"
+	"strings"
+
+	"github.com/Shopify/sarama"
+)
+
+var (
+	brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
+	topic      = flag.String("topic", "", "REQUIRED: the topic to consume")
+	partition  = flag.Int("partition", -1, "REQUIRED: the partition to consume")
+	offset     = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset")
+	verbose    = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+
+	logger = log.New(os.Stderr, "", log.LstdFlags)
+)
+
+func main() {
+	flag.Parse()
+
+	if *brokerList == "" {
+		printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
+	}
+
+	if *topic == "" {
+		printUsageErrorAndExit("-topic is required")
+	}
+
+	if *partition == -1 {
+		printUsageErrorAndExit("-partition is required")
+	}
+
+	if *verbose {
+		sarama.Logger = logger
+	}
+
+	var (
+		initialOffset int64
+		offsetError   error
+	)
+	switch *offset {
+	case "oldest":
+		initialOffset = sarama.OffsetOldest
+	case "newest":
+		initialOffset = sarama.OffsetNewest
+	default:
+		initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64)
+	}
+
+	if offsetError != nil {
+		printUsageErrorAndExit("Invalid initial offset: %s", *offset)
+	}
+
+	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
+	if err != nil {
+		printErrorAndExit(69, "Failed to start consumer: %s", err)
+	}
+
+	pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset)
+	if err != nil {
+		printErrorAndExit(69, "Failed to start partition consumer: %s", err)
+	}
+
+	go func() {
+		signals := make(chan os.Signal, 1)
+		signal.Notify(signals, os.Kill, os.Interrupt)
+		<-signals
+		pc.AsyncClose()
+	}()
+
+	for msg := range pc.Messages() {
+		fmt.Printf("Offset:\t%d\n", msg.Offset)
+		fmt.Printf("Key:\t%s\n", string(msg.Key))
+		fmt.Printf("Value:\t%s\n", string(msg.Value))
+		fmt.Println()
+	}
+
+	if err := c.Close(); err != nil {
+		logger.Println("Failed to close consumer: ", err)
+	}
+}
+
+func printErrorAndExit(code int, format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	os.Exit(code)
+}
+
+func printUsageErrorAndExit(format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	fmt.Fprintln(os.Stderr, "Available command line options:")
+	flag.PrintDefaults()
+	os.Exit(64)
+}