Browse Source

Merge pull request #439 from Shopify/topic_consumer_tool

Add a simple topic consumer tool
Willem van Bergen 10 years ago
parent
commit
b5ea19584e

+ 2 - 0
tools/kafka-console-topicconsumer/.gitignore

@@ -0,0 +1,2 @@
+kafka-console-topicconsumer
+kafka-console-topicconsumer.test

+ 25 - 0
tools/kafka-console-topicconsumer/README.md

@@ -0,0 +1,25 @@
+# kafka-console-topicconsumer
+
+A simple command line tool to consume all partitions a topic and print the messages
+on the standard output.
+
+### Installation
+
+    go install github.com/Shopify/sarama/tools/kafka-console-topicconsumer
+
+### Usage
+
+    # Minimum invocation
+    kafka-console-topicconsumer -topic=test -brokers=kafka1:9092
+
+    # It will pick up a KAFKA_PEERS environment variable
+    export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092
+    kafka-console-topicconsumer -topic=test
+
+    # You can specify the offset you want to start at. It can be either
+    # `oldest`, `newest`. The default is `newest`.
+    kafka-console-topicconsumer -topic=test -offset=oldest
+    kafka-console-topicconsumer -topic=test -offset=newest
+
+    # Display all command line options
+    kafka-console-topicconsumer -help

+ 125 - 0
tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go

@@ -0,0 +1,125 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"os/signal"
+	"strings"
+	"sync"
+
+	"github.com/Shopify/sarama"
+)
+
+var (
+	brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
+	topic      = flag.String("topic", "", "REQUIRED: the topic to consume")
+	offset     = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
+	verbose    = flag.Bool("verbose", false, "Whether to turn on sarama logging")
+	bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
+
+	logger = log.New(os.Stderr, "", log.LstdFlags)
+)
+
+func main() {
+	flag.Parse()
+
+	if *brokerList == "" {
+		printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
+	}
+
+	if *topic == "" {
+		printUsageErrorAndExit("-topic is required")
+	}
+
+	if *verbose {
+		sarama.Logger = logger
+	}
+
+	var initialOffset int64
+	switch *offset {
+	case "oldest":
+		initialOffset = sarama.OffsetOldest
+	case "newest":
+		initialOffset = sarama.OffsetNewest
+	default:
+		printUsageErrorAndExit("-offset should be `oldest` or `newest`")
+	}
+
+	c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
+	if err != nil {
+		printErrorAndExit(69, "Failed to start consumer: %s", err)
+	}
+
+	partitions, err := c.Partitions(*topic)
+	if err != nil {
+		printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
+	}
+
+	var (
+		messages = make(chan *sarama.ConsumerMessage, *bufferSize)
+		closing  = make(chan struct{})
+		wg       sync.WaitGroup
+	)
+
+	go func() {
+		signals := make(chan os.Signal, 1)
+		signal.Notify(signals, os.Kill, os.Interrupt)
+		<-signals
+		logger.Println("Initiating shutdown of consumer...")
+		close(closing)
+	}()
+
+	for _, partition := range partitions {
+		pc, err := c.ConsumePartition(*topic, partition, initialOffset)
+		if err != nil {
+			printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
+		}
+
+		go func(pc sarama.PartitionConsumer) {
+			<-closing
+			pc.AsyncClose()
+		}(pc)
+
+		wg.Add(1)
+		go func(pc sarama.PartitionConsumer) {
+			defer wg.Done()
+			for message := range pc.Messages() {
+				messages <- message
+			}
+		}(pc)
+	}
+
+	go func() {
+		for msg := range messages {
+			fmt.Printf("Partition:\t%d\n", msg.Partition)
+			fmt.Printf("Offset:\t%d\n", msg.Offset)
+			fmt.Printf("Key:\t%s\n", string(msg.Key))
+			fmt.Printf("Value:\t%s\n", string(msg.Value))
+			fmt.Println()
+		}
+	}()
+
+	wg.Wait()
+	logger.Println("Done consuming topic", *topic)
+	close(messages)
+
+	if err := c.Close(); err != nil {
+		logger.Println("Failed to close consumer: ", err)
+	}
+}
+
+func printErrorAndExit(code int, format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	os.Exit(code)
+}
+
+func printUsageErrorAndExit(format string, values ...interface{}) {
+	fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
+	fmt.Fprintln(os.Stderr)
+	fmt.Fprintln(os.Stderr, "Available command line options:")
+	flag.PrintDefaults()
+	os.Exit(64)
+}