| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package main
- import (
- "flag"
- "github.com/Shopify/sarama"
- "log"
- "os"
- "strings"
- "time"
- )
- var (
- brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
- topic = flag.String("topic", "", "REQUIRED: the topic to consume")
- )
- func main() {
- flag.Parse()
- if *brokerList == "" {
- log.Fatal("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
- }
- if *topic == "" {
- log.Fatal("-topic is required")
- }
- c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
- if err != nil {
- log.Fatalf("Failed to start consumer: %s", err)
- }
- partitionList, err := c.Partitions(*topic)
- if err != nil {
- log.Fatalf("Failed to get the list of partitions: %s", err)
- }
- for _, partition := range partitionList {
- pc, err := c.ConsumePartition(*topic, partition, sarama.OffsetOldest)
- if err != nil {
- log.Fatalf("Failed to start consumer for partition %d: %s", partition, err)
- }
- go func() {
- for err := range pc.Errors() {
- log.Fatal(err)
- }
- }()
- msgChannel := pc.Messages()
- read1Partition:
- for {
- timeout := time.NewTimer(1 * time.Second)
- select {
- case msg, open := <-msgChannel:
- if !open {
- log.Println("channel message is closed")
- break read1Partition
- }
- log.Println(string(msg.Value))
- case <-timeout.C:
- break read1Partition
- }
- }
- }
- log.Println("Done consuming topic", *topic)
- if err := c.Close(); err != nil {
- log.Println("Failed to close consumer: ", err)
- }
- }
|