123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "sync"
- "github.com/Shopify/sarama"
- "github.com/Shopify/sarama/tools/tls"
- )
- var (
- brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
- topic = flag.String("topic", "", "REQUIRED: the topic to consume")
- partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
- offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
- verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
- tlsEnabled = flag.Bool("tls-enabled", false, "Whether to enable TLS")
- tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
- tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
- tlsClientKey = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
- bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
- logger = log.New(os.Stderr, "", log.LstdFlags)
- )
- func main() {
- flag.Parse()
- if *brokerList == "" {
- printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
- }
- if *topic == "" {
- printUsageErrorAndExit("-topic is required")
- }
- if *verbose {
- sarama.Logger = logger
- }
- var initialOffset int64
- switch *offset {
- case "oldest":
- initialOffset = sarama.OffsetOldest
- case "newest":
- initialOffset = sarama.OffsetNewest
- default:
- printUsageErrorAndExit("-offset should be `oldest` or `newest`")
- }
- config := sarama.NewConfig()
- if *tlsEnabled {
- tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
- if err != nil {
- printErrorAndExit(69, "Failed to create TLS config: %s", err)
- }
- config.Net.TLS.Enable = true
- config.Net.TLS.Config = tlsConfig
- config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
- }
- c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
- if err != nil {
- printErrorAndExit(69, "Failed to start consumer: %s", err)
- }
- partitionList, err := getPartitions(c)
- if err != nil {
- printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
- }
- var (
- messages = make(chan *sarama.ConsumerMessage, *bufferSize)
- closing = make(chan struct{})
- wg sync.WaitGroup
- )
- go func() {
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Kill, os.Interrupt)
- <-signals
- logger.Println("Initiating shutdown of consumer...")
- close(closing)
- }()
- for _, partition := range partitionList {
- pc, err := c.ConsumePartition(*topic, partition, initialOffset)
- if err != nil {
- printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
- }
- go func(pc sarama.PartitionConsumer) {
- <-closing
- pc.AsyncClose()
- }(pc)
- wg.Add(1)
- go func(pc sarama.PartitionConsumer) {
- defer wg.Done()
- for message := range pc.Messages() {
- messages <- message
- }
- }(pc)
- }
- go func() {
- for msg := range messages {
- fmt.Printf("Partition:\t%d\n", msg.Partition)
- fmt.Printf("Offset:\t%d\n", msg.Offset)
- fmt.Printf("Key:\t%s\n", string(msg.Key))
- fmt.Printf("Value:\t%s\n", string(msg.Value))
- fmt.Println()
- }
- }()
- wg.Wait()
- logger.Println("Done consuming topic", *topic)
- close(messages)
- if err := c.Close(); err != nil {
- logger.Println("Failed to close consumer: ", err)
- }
- }
- func getPartitions(c sarama.Consumer) ([]int32, error) {
- if *partitions == "all" {
- return c.Partitions(*topic)
- }
- tmp := strings.Split(*partitions, ",")
- var pList []int32
- for i := range tmp {
- val, err := strconv.ParseInt(tmp[i], 10, 32)
- if err != nil {
- return nil, err
- }
- pList = append(pList, int32(val))
- }
- return pList, nil
- }
- func printErrorAndExit(code int, format string, values ...interface{}) {
- fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
- fmt.Fprintln(os.Stderr)
- os.Exit(code)
- }
- func printUsageErrorAndExit(format string, values ...interface{}) {
- fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
- fmt.Fprintln(os.Stderr)
- fmt.Fprintln(os.Stderr, "Available command line options:")
- flag.PrintDefaults()
- os.Exit(64)
- }
|