|
@@ -0,0 +1,131 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "flag"
|
|
|
+ "log"
|
|
|
+ "os"
|
|
|
+ "os/signal"
|
|
|
+ "strings"
|
|
|
+ "syscall"
|
|
|
+
|
|
|
+ "github.com/Shopify/sarama"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+var (
|
|
|
+ brokers = ""
|
|
|
+ version = ""
|
|
|
+ group = ""
|
|
|
+ topics = ""
|
|
|
+ oldest = true
|
|
|
+ verbose = false
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
|
|
|
+ flag.StringVar(&group, "group", "", "Kafka consumer group definition")
|
|
|
+ flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
|
|
|
+ flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
|
|
|
+ flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
|
|
|
+ flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
|
|
|
+ flag.Parse()
|
|
|
+
|
|
|
+ if len(brokers) == 0 {
|
|
|
+ panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(topics) == 0 {
|
|
|
+ panic("no topics given to be consumed, please set the -topics flag")
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(group) == 0 {
|
|
|
+ panic("no Kafka consumer group defined, please set the -group flag")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ log.Println("Starting a new Sarama consumer")
|
|
|
+
|
|
|
+ if verbose {
|
|
|
+ sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
|
|
+ }
|
|
|
+
|
|
|
+ version, err := sarama.ParseKafkaVersion(version)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * Construct a new Sarama configuration.
|
|
|
+ * The Kafka cluster version has to be defined before the consumer/producer is initialized.
|
|
|
+ */
|
|
|
+ config := sarama.NewConfig()
|
|
|
+ config.Version = version
|
|
|
+
|
|
|
+ if oldest {
|
|
|
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * Setup a new Sarama consumer group
|
|
|
+ */
|
|
|
+ consumer := Consumer{
|
|
|
+ ready: make(chan bool, 0),
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ for {
|
|
|
+ err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ <-consumer.ready
|
|
|
+ log.Println("Sarama consumer up and running!...")
|
|
|
+
|
|
|
+ sigterm := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
|
+
|
|
|
+ <-sigterm
|
|
|
+
|
|
|
+ err = client.Close()
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type Consumer struct {
|
|
|
+ ready chan bool
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
|
|
|
+
|
|
|
+ close(consumer.ready)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
|
+ for message := range claim.Messages() {
|
|
|
+ log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
|
|
|
+ session.MarkMessage(message, "")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|