123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "syscall"
- "github.com/Shopify/sarama"
- )
- var (
- brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
- topic = flag.String("topic", "", "REQUIRED: the topic to consume")
- partition = flag.Int("partition", -1, "REQUIRED: the partition to consume")
- offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset")
- verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
- logger = log.New(os.Stderr, "", log.LstdFlags)
- )
- func main() {
- flag.Parse()
- if *brokerList == "" {
- printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
- }
- if *topic == "" {
- printUsageErrorAndExit("-topic is required")
- }
- if *partition == -1 {
- printUsageErrorAndExit("-partition is required")
- }
- if *verbose {
- sarama.Logger = logger
- }
- var (
- initialOffset int64
- offsetError error
- )
- switch *offset {
- case "oldest":
- initialOffset = sarama.OffsetOldest
- case "newest":
- initialOffset = sarama.OffsetNewest
- default:
- initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64)
- }
- if offsetError != nil {
- printUsageErrorAndExit("Invalid initial offset: %s", *offset)
- }
- c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
- if err != nil {
- printErrorAndExit(69, "Failed to start consumer: %s", err)
- }
- pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset)
- if err != nil {
- printErrorAndExit(69, "Failed to start partition consumer: %s", err)
- }
- go func() {
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
- <-signals
- pc.AsyncClose()
- }()
- for msg := range pc.Messages() {
- fmt.Printf("Offset:\t%d\n", msg.Offset)
- fmt.Printf("Key:\t%s\n", string(msg.Key))
- fmt.Printf("Value:\t%s\n", string(msg.Value))
- fmt.Println()
- }
- if err := c.Close(); err != nil {
- logger.Println("Failed to close consumer: ", err)
- }
- }
- func printErrorAndExit(code int, format string, values ...interface{}) {
- fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
- fmt.Fprintln(os.Stderr)
- os.Exit(code)
- }
- func printUsageErrorAndExit(format string, values ...interface{}) {
- fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
- fmt.Fprintln(os.Stderr)
- fmt.Fprintln(os.Stderr, "Available command line options:")
- flag.PrintDefaults()
- os.Exit(64)
- }
|