main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "syscall"
  10. "github.com/Shopify/sarama"
  11. )
  12. // Sarma configuration options
  13. var (
  14. brokers = ""
  15. version = ""
  16. group = ""
  17. topics = ""
  18. oldest = true
  19. verbose = false
  20. )
  21. func init() {
  22. flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
  23. flag.StringVar(&group, "group", "", "Kafka consumer group definition")
  24. flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
  25. flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
  26. flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
  27. flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
  28. flag.Parse()
  29. if len(brokers) == 0 {
  30. panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
  31. }
  32. if len(topics) == 0 {
  33. panic("no topics given to be consumed, please set the -topics flag")
  34. }
  35. if len(group) == 0 {
  36. panic("no Kafka consumer group defined, please set the -group flag")
  37. }
  38. }
  39. func main() {
  40. log.Println("Starting a new Sarama consumer")
  41. if verbose {
  42. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  43. }
  44. version, err := sarama.ParseKafkaVersion(version)
  45. if err != nil {
  46. panic(err)
  47. }
  48. /**
  49. * Construct a new Sarama configuration.
  50. * The Kafka cluster version has to be defined before the consumer/producer is initialized.
  51. */
  52. config := sarama.NewConfig()
  53. config.Version = version
  54. if oldest {
  55. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  56. }
  57. /**
  58. * Setup a new Sarama consumer group
  59. */
  60. consumer := Consumer{
  61. ready: make(chan bool, 0),
  62. }
  63. ctx := context.Background()
  64. client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
  65. if err != nil {
  66. panic(err)
  67. }
  68. go func() {
  69. for {
  70. err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
  71. if err != nil {
  72. panic(err)
  73. }
  74. }
  75. }()
  76. <-consumer.ready // Await till the consumer has been set up
  77. log.Println("Sarama consumer up and running!...")
  78. sigterm := make(chan os.Signal, 1)
  79. signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
  80. <-sigterm // Await a sigterm signal before safely closing the consumer
  81. err = client.Close()
  82. if err != nil {
  83. panic(err)
  84. }
  85. }
  86. // Consumer represents a Sarama consumer group consumer
  87. type Consumer struct {
  88. ready chan bool
  89. }
  90. // Setup is run at the beginning of a new session, before ConsumeClaim
  91. func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
  92. // Mark the consumer as ready
  93. close(consumer.ready)
  94. return nil
  95. }
  96. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  97. func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
  98. return nil
  99. }
  100. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  101. func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  102. // NOTE:
  103. // Do not move the code below to a goroutine.
  104. // The `ConsumeClaim` itself is called within a goroutine, see:
  105. // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
  106. for message := range claim.Messages() {
  107. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
  108. session.MarkMessage(message, "")
  109. }
  110. return nil
  111. }