123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package main
- import (
- "context"
- "flag"
- "log"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "github.com/Shopify/sarama"
- )
- // Sarama configuration options
- var (
- brokers = ""
- version = ""
- group = ""
- topics = ""
- assignor = ""
- oldest = true
- verbose = false
- )
- func init() {
- flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
- flag.StringVar(&group, "group", "", "Kafka consumer group definition")
- flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
- flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
- flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
- flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
- flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
- flag.Parse()
- if len(brokers) == 0 {
- panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
- }
- if len(topics) == 0 {
- panic("no topics given to be consumed, please set the -topics flag")
- }
- if len(group) == 0 {
- panic("no Kafka consumer group defined, please set the -group flag")
- }
- }
- func main() {
- log.Println("Starting a new Sarama consumer")
- if verbose {
- sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- }
- version, err := sarama.ParseKafkaVersion(version)
- if err != nil {
- log.Panicf("Error parsing Kafka version: %v", err)
- }
- /**
- * Construct a new Sarama configuration.
- * The Kafka cluster version has to be defined before the consumer/producer is initialized.
- */
- config := sarama.NewConfig()
- config.Version = version
- switch assignor {
- case "sticky":
- config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
- case "roundrobin":
- config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
- case "range":
- config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
- default:
- log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
- }
- if oldest {
- config.Consumer.Offsets.Initial = sarama.OffsetOldest
- }
- /**
- * Setup a new Sarama consumer group
- */
- consumer := Consumer{
- ready: make(chan bool),
- }
- ctx, cancel := context.WithCancel(context.Background())
- client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
- if err != nil {
- log.Panicf("Error creating consumer group client: %v", err)
- }
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- // `Consume` should be called inside an infinite loop, when a
- // server-side rebalance happens, the consumer session will need to be
- // recreated to get the new claims
- if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
- log.Panicf("Error from consumer: %v", err)
- }
- // check if context was cancelled, signaling that the consumer should stop
- if ctx.Err() != nil {
- return
- }
- consumer.ready = make(chan bool)
- }
- }()
- <-consumer.ready // Await till the consumer has been set up
- log.Println("Sarama consumer up and running!...")
- sigterm := make(chan os.Signal, 1)
- signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
- select {
- case <-ctx.Done():
- log.Println("terminating: context cancelled")
- case <-sigterm:
- log.Println("terminating: via signal")
- }
- cancel()
- wg.Wait()
- if err = client.Close(); err != nil {
- log.Panicf("Error closing client: %v", err)
- }
- }
- // Consumer represents a Sarama consumer group consumer
- type Consumer struct {
- ready chan bool
- }
- // Setup is run at the beginning of a new session, before ConsumeClaim
- func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
- // Mark the consumer as ready
- close(consumer.ready)
- return nil
- }
- // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
- func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
- return nil
- }
- // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
- func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- // NOTE:
- // Do not move the code below to a goroutine.
- // The `ConsumeClaim` itself is called within a goroutine, see:
- // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
- for message := range claim.Messages() {
- log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
- session.MarkMessage(message, "")
- }
- return nil
- }
|