kafka-console-consumer.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package main
  2. import (
  3. "flag"
  4. "log"
  5. "os"
  6. "strings"
  7. "time"
  8. // when testing locally, it is way faster to edit this to reference your local repo if different
  9. // if you want to do it properly but slowly you may use some dependency manager magic
  10. "github.com/FrancoisPoinsot/sarama"
  11. )
  12. var (
  13. brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
  14. topic = flag.String("topic", "", "REQUIRED: the topic to consume")
  15. )
  16. func main() {
  17. flag.Parse()
  18. if *brokerList == "" {
  19. log.Fatal("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
  20. }
  21. if *topic == "" {
  22. log.Fatal("-topic is required")
  23. }
  24. config := sarama.NewConfig()
  25. config.Version = sarama.V1_1_1_0
  26. config.Consumer.IsolationLevel = sarama.ReadCommitted
  27. config.Consumer.MaxProcessingTime = 20 * 365 * 24 * time.Hour
  28. c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
  29. if err != nil {
  30. log.Fatalf("Failed to start consumer: %s", err)
  31. }
  32. partitionList, err := c.Partitions(*topic)
  33. if err != nil {
  34. log.Fatalf("Failed to get the list of partitions: %s", err)
  35. }
  36. for _, partition := range partitionList {
  37. pc, err := c.ConsumePartition(*topic, partition, sarama.OffsetOldest)
  38. if err != nil {
  39. log.Fatalf("Failed to start consumer for partition %d: %s", partition, err)
  40. }
  41. go func() {
  42. for err := range pc.Errors() {
  43. log.Fatal(err)
  44. }
  45. }()
  46. msgChannel := pc.Messages()
  47. read1Partition:
  48. for {
  49. //timeout := time.NewTimer(1 * time.Second)
  50. select {
  51. case msg, open := <-msgChannel:
  52. if !open {
  53. log.Println("channel message is closed")
  54. break read1Partition
  55. }
  56. log.Println(string(msg.Value))
  57. //case <-timeout.C:
  58. // break read1Partition
  59. }
  60. }
  61. }
  62. log.Println("Done consuming topic", *topic)
  63. if err := c.Close(); err != nil {
  64. log.Println("Failed to close consumer: ", err)
  65. }
  66. }