kafka-console-consumer.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package main
  2. import (
  3. "flag"
  4. "github.com/Shopify/sarama"
  5. "log"
  6. "os"
  7. "strings"
  8. "time"
  9. )
  10. var (
  11. brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
  12. topic = flag.String("topic", "", "REQUIRED: the topic to consume")
  13. )
  14. func main() {
  15. flag.Parse()
  16. if *brokerList == "" {
  17. log.Fatal("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
  18. }
  19. if *topic == "" {
  20. log.Fatal("-topic is required")
  21. }
  22. config := sarama.NewConfig()
  23. config.Version = sarama.V1_1_1_0
  24. config.Consumer.IsolationLevel = sarama.ReadCommitted
  25. config.Consumer.MaxProcessingTime = 20 * 365 * 24 * time.Hour
  26. c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
  27. if err != nil {
  28. log.Fatalf("Failed to start consumer: %s", err)
  29. }
  30. partitionList, err := c.Partitions(*topic)
  31. if err != nil {
  32. log.Fatalf("Failed to get the list of partitions: %s", err)
  33. }
  34. for _, partition := range partitionList {
  35. pc, err := c.ConsumePartition(*topic, partition, sarama.OffsetOldest)
  36. if err != nil {
  37. log.Fatalf("Failed to start consumer for partition %d: %s", partition, err)
  38. }
  39. go func() {
  40. for err := range pc.Errors() {
  41. log.Fatal(err)
  42. }
  43. }()
  44. msgChannel := pc.Messages()
  45. read1Partition:
  46. for {
  47. //timeout := time.NewTimer(1 * time.Second)
  48. select {
  49. case msg, open := <-msgChannel:
  50. if !open {
  51. log.Println("channel message is closed")
  52. break read1Partition
  53. }
  54. log.Println(string(msg.Value))
  55. //case <-timeout.C:
  56. // break read1Partition
  57. }
  58. }
  59. }
  60. log.Println("Done consuming topic", *topic)
  61. if err := c.Close(); err != nil {
  62. log.Println("Failed to close consumer: ", err)
  63. }
  64. }