kafka-console-producer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "strings"
  9. "github.com/Shopify/sarama"
  10. "github.com/rcrowley/go-metrics"
  11. )
  12. var (
  13. brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
  14. topic = flag.String("topic", "", "REQUIRED: the topic to produce to")
  15. key = flag.String("key", "", "The key of the message to produce. Can be empty.")
  16. value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
  17. partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
  18. partition = flag.Int("partition", -1, "The partition to produce to.")
  19. verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
  20. showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
  21. silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
  22. logger = log.New(os.Stderr, "", log.LstdFlags)
  23. )
  24. func main() {
  25. flag.Parse()
  26. if *brokerList == "" {
  27. printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
  28. }
  29. if *topic == "" {
  30. printUsageErrorAndExit("no -topic specified")
  31. }
  32. if *verbose {
  33. sarama.Logger = logger
  34. }
  35. config := sarama.NewConfig()
  36. config.Producer.RequiredAcks = sarama.WaitForAll
  37. config.Producer.Return.Successes = true
  38. switch *partitioner {
  39. case "":
  40. if *partition >= 0 {
  41. config.Producer.Partitioner = sarama.NewManualPartitioner
  42. } else {
  43. config.Producer.Partitioner = sarama.NewHashPartitioner
  44. }
  45. case "hash":
  46. config.Producer.Partitioner = sarama.NewHashPartitioner
  47. case "random":
  48. config.Producer.Partitioner = sarama.NewRandomPartitioner
  49. case "manual":
  50. config.Producer.Partitioner = sarama.NewManualPartitioner
  51. if *partition == -1 {
  52. printUsageErrorAndExit("-partition is required when partitioning manually")
  53. }
  54. default:
  55. printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
  56. }
  57. message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
  58. if *key != "" {
  59. message.Key = sarama.StringEncoder(*key)
  60. }
  61. if *value != "" {
  62. message.Value = sarama.StringEncoder(*value)
  63. } else if stdinAvailable() {
  64. bytes, err := ioutil.ReadAll(os.Stdin)
  65. if err != nil {
  66. printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
  67. }
  68. message.Value = sarama.ByteEncoder(bytes)
  69. } else {
  70. printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
  71. }
  72. producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
  73. if err != nil {
  74. printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
  75. }
  76. defer func() {
  77. if err := producer.Close(); err != nil {
  78. logger.Println("Failed to close Kafka producer cleanly:", err)
  79. }
  80. }()
  81. partition, offset, err := producer.SendMessage(message)
  82. if err != nil {
  83. printErrorAndExit(69, "Failed to produce message: %s", err)
  84. } else if !*silent {
  85. fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
  86. }
  87. if *showMetrics {
  88. metrics.WriteOnce(config.MetricRegistry, os.Stderr)
  89. }
  90. }
  91. func printErrorAndExit(code int, format string, values ...interface{}) {
  92. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  93. fmt.Fprintln(os.Stderr)
  94. os.Exit(code)
  95. }
  96. func printUsageErrorAndExit(message string) {
  97. fmt.Fprintln(os.Stderr, "ERROR:", message)
  98. fmt.Fprintln(os.Stderr)
  99. fmt.Fprintln(os.Stderr, "Available command line options:")
  100. flag.PrintDefaults()
  101. os.Exit(64)
  102. }
  103. func stdinAvailable() bool {
  104. stat, _ := os.Stdin.Stat()
  105. return (stat.Mode() & os.ModeCharDevice) == 0
  106. }