kafka-console-consumer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "github.com/Shopify/sarama"
  13. "github.com/Shopify/sarama/tools/tls"
  14. )
  15. var (
  16. brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
  17. topic = flag.String("topic", "", "REQUIRED: the topic to consume")
  18. partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
  19. offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
  20. verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
  21. tlsEnabled = flag.Bool("tls-enabled", false, "Whether to enable TLS")
  22. tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
  23. tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
  24. tlsClientKey = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
  25. bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
  26. logger = log.New(os.Stderr, "", log.LstdFlags)
  27. )
  28. func main() {
  29. flag.Parse()
  30. if *brokerList == "" {
  31. printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
  32. }
  33. if *topic == "" {
  34. printUsageErrorAndExit("-topic is required")
  35. }
  36. if *verbose {
  37. sarama.Logger = logger
  38. }
  39. var initialOffset int64
  40. switch *offset {
  41. case "oldest":
  42. initialOffset = sarama.OffsetOldest
  43. case "newest":
  44. initialOffset = sarama.OffsetNewest
  45. default:
  46. printUsageErrorAndExit("-offset should be `oldest` or `newest`")
  47. }
  48. config := sarama.NewConfig()
  49. if *tlsEnabled {
  50. tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
  51. if err != nil {
  52. printErrorAndExit(69, "Failed to create TLS config: %s", err)
  53. }
  54. config.Net.TLS.Enable = true
  55. config.Net.TLS.Config = tlsConfig
  56. config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
  57. }
  58. c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
  59. if err != nil {
  60. printErrorAndExit(69, "Failed to start consumer: %s", err)
  61. }
  62. partitionList, err := getPartitions(c)
  63. if err != nil {
  64. printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
  65. }
  66. var (
  67. messages = make(chan *sarama.ConsumerMessage, *bufferSize)
  68. closing = make(chan struct{})
  69. wg sync.WaitGroup
  70. )
  71. go func() {
  72. signals := make(chan os.Signal, 1)
  73. signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
  74. <-signals
  75. logger.Println("Initiating shutdown of consumer...")
  76. close(closing)
  77. }()
  78. for _, partition := range partitionList {
  79. pc, err := c.ConsumePartition(*topic, partition, initialOffset)
  80. if err != nil {
  81. printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
  82. }
  83. go func(pc sarama.PartitionConsumer) {
  84. <-closing
  85. pc.AsyncClose()
  86. }(pc)
  87. wg.Add(1)
  88. go func(pc sarama.PartitionConsumer) {
  89. defer wg.Done()
  90. for message := range pc.Messages() {
  91. messages <- message
  92. }
  93. }(pc)
  94. }
  95. go func() {
  96. for msg := range messages {
  97. fmt.Printf("Partition:\t%d\n", msg.Partition)
  98. fmt.Printf("Offset:\t%d\n", msg.Offset)
  99. fmt.Printf("Key:\t%s\n", string(msg.Key))
  100. fmt.Printf("Value:\t%s\n", string(msg.Value))
  101. fmt.Println()
  102. }
  103. }()
  104. wg.Wait()
  105. logger.Println("Done consuming topic", *topic)
  106. close(messages)
  107. if err := c.Close(); err != nil {
  108. logger.Println("Failed to close consumer: ", err)
  109. }
  110. }
  111. func getPartitions(c sarama.Consumer) ([]int32, error) {
  112. if *partitions == "all" {
  113. return c.Partitions(*topic)
  114. }
  115. tmp := strings.Split(*partitions, ",")
  116. var pList []int32
  117. for i := range tmp {
  118. val, err := strconv.ParseInt(tmp[i], 10, 32)
  119. if err != nil {
  120. return nil, err
  121. }
  122. pList = append(pList, int32(val))
  123. }
  124. return pList, nil
  125. }
  126. func printErrorAndExit(code int, format string, values ...interface{}) {
  127. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  128. fmt.Fprintln(os.Stderr)
  129. os.Exit(code)
  130. }
  131. func printUsageErrorAndExit(format string, values ...interface{}) {
  132. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  133. fmt.Fprintln(os.Stderr)
  134. fmt.Fprintln(os.Stderr, "Available command line options:")
  135. flag.PrintDefaults()
  136. os.Exit(64)
  137. }