main.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "sync"
  10. "syscall"
  11. "github.com/Shopify/sarama"
  12. )
  13. // Sarama configuration options
  14. var (
  15. brokers = ""
  16. version = ""
  17. group = ""
  18. topics = ""
  19. assignor = ""
  20. oldest = true
  21. verbose = false
  22. )
  23. func init() {
  24. flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
  25. flag.StringVar(&group, "group", "", "Kafka consumer group definition")
  26. flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
  27. flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
  28. flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
  29. flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
  30. flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
  31. flag.Parse()
  32. if len(brokers) == 0 {
  33. panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
  34. }
  35. if len(topics) == 0 {
  36. panic("no topics given to be consumed, please set the -topics flag")
  37. }
  38. if len(group) == 0 {
  39. panic("no Kafka consumer group defined, please set the -group flag")
  40. }
  41. }
  42. func main() {
  43. log.Println("Starting a new Sarama consumer")
  44. if verbose {
  45. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  46. }
  47. version, err := sarama.ParseKafkaVersion(version)
  48. if err != nil {
  49. log.Panicf("Error parsing Kafka version: %v", err)
  50. }
  51. /**
  52. * Construct a new Sarama configuration.
  53. * The Kafka cluster version has to be defined before the consumer/producer is initialized.
  54. */
  55. config := sarama.NewConfig()
  56. config.Version = version
  57. switch assignor {
  58. case "sticky":
  59. config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
  60. case "roundrobin":
  61. config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  62. case "range":
  63. config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
  64. default:
  65. log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
  66. }
  67. if oldest {
  68. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  69. }
  70. /**
  71. * Setup a new Sarama consumer group
  72. */
  73. consumer := Consumer{
  74. ready: make(chan bool),
  75. }
  76. ctx, cancel := context.WithCancel(context.Background())
  77. client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
  78. if err != nil {
  79. log.Panicf("Error creating consumer group client: %v", err)
  80. }
  81. wg := &sync.WaitGroup{}
  82. wg.Add(1)
  83. go func() {
  84. defer wg.Done()
  85. for {
  86. // `Consume` should be called inside an infinite loop, when a
  87. // server-side rebalance happens, the consumer session will need to be
  88. // recreated to get the new claims
  89. if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
  90. log.Panicf("Error from consumer: %v", err)
  91. }
  92. // check if context was cancelled, signaling that the consumer should stop
  93. if ctx.Err() != nil {
  94. return
  95. }
  96. consumer.ready = make(chan bool)
  97. }
  98. }()
  99. <-consumer.ready // Await till the consumer has been set up
  100. log.Println("Sarama consumer up and running!...")
  101. sigterm := make(chan os.Signal, 1)
  102. signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
  103. select {
  104. case <-ctx.Done():
  105. log.Println("terminating: context cancelled")
  106. case <-sigterm:
  107. log.Println("terminating: via signal")
  108. }
  109. cancel()
  110. wg.Wait()
  111. if err = client.Close(); err != nil {
  112. log.Panicf("Error closing client: %v", err)
  113. }
  114. }
  115. // Consumer represents a Sarama consumer group consumer
  116. type Consumer struct {
  117. ready chan bool
  118. }
  119. // Setup is run at the beginning of a new session, before ConsumeClaim
  120. func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
  121. // Mark the consumer as ready
  122. close(consumer.ready)
  123. return nil
  124. }
  125. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  126. func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
  127. return nil
  128. }
  129. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  130. func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  131. // NOTE:
  132. // Do not move the code below to a goroutine.
  133. // The `ConsumeClaim` itself is called within a goroutine, see:
  134. // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
  135. for message := range claim.Messages() {
  136. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
  137. session.MarkMessage(message, "")
  138. }
  139. return nil
  140. }