kafka-console-producer.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "strings"
  9. "github.com/rcrowley/go-metrics"
  10. "github.com/Shopify/sarama"
  11. "github.com/Shopify/sarama/tools/tls"
  12. )
  13. var (
  14. brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
  15. headers = flag.String("headers", "", "The headers of the message to produce. Example: -headers=foo:bar,bar:foo")
  16. topic = flag.String("topic", "", "REQUIRED: the topic to produce to")
  17. key = flag.String("key", "", "The key of the message to produce. Can be empty.")
  18. value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
  19. partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
  20. partition = flag.Int("partition", -1, "The partition to produce to.")
  21. verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
  22. showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
  23. silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
  24. tlsEnabled = flag.Bool("tls-enabled", false, "Whether to enable TLS")
  25. tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
  26. tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
  27. tlsClientKey = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
  28. logger = log.New(os.Stderr, "", log.LstdFlags)
  29. )
  30. func main() {
  31. flag.Parse()
  32. if *brokerList == "" {
  33. printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
  34. }
  35. if *topic == "" {
  36. printUsageErrorAndExit("no -topic specified")
  37. }
  38. if *verbose {
  39. sarama.Logger = logger
  40. }
  41. config := sarama.NewConfig()
  42. config.Producer.RequiredAcks = sarama.WaitForAll
  43. config.Producer.Return.Successes = true
  44. if *tlsEnabled {
  45. tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
  46. if err != nil {
  47. printErrorAndExit(69, "Failed to create TLS config: %s", err)
  48. }
  49. config.Net.TLS.Enable = true
  50. config.Net.TLS.Config = tlsConfig
  51. config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
  52. }
  53. switch *partitioner {
  54. case "":
  55. if *partition >= 0 {
  56. config.Producer.Partitioner = sarama.NewManualPartitioner
  57. } else {
  58. config.Producer.Partitioner = sarama.NewHashPartitioner
  59. }
  60. case "hash":
  61. config.Producer.Partitioner = sarama.NewHashPartitioner
  62. case "random":
  63. config.Producer.Partitioner = sarama.NewRandomPartitioner
  64. case "manual":
  65. config.Producer.Partitioner = sarama.NewManualPartitioner
  66. if *partition == -1 {
  67. printUsageErrorAndExit("-partition is required when partitioning manually")
  68. }
  69. default:
  70. printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
  71. }
  72. message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
  73. if *key != "" {
  74. message.Key = sarama.StringEncoder(*key)
  75. }
  76. if *value != "" {
  77. message.Value = sarama.StringEncoder(*value)
  78. } else if stdinAvailable() {
  79. bytes, err := ioutil.ReadAll(os.Stdin)
  80. if err != nil {
  81. printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
  82. }
  83. message.Value = sarama.ByteEncoder(bytes)
  84. } else {
  85. printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
  86. }
  87. if *headers != "" {
  88. hdrs := []sarama.RecordHeader{}
  89. arrHdrs := strings.Split(*headers, ",")
  90. for _, h := range arrHdrs {
  91. if header := strings.Split(h, ":"); len(header) != 2 {
  92. printUsageErrorAndExit("-header should be key:value. Example: -headers=foo:bar,bar:foo")
  93. } else {
  94. hdrs = append(hdrs, sarama.RecordHeader{
  95. Key: []byte(header[0]),
  96. Value: []byte(header[1]),
  97. })
  98. }
  99. }
  100. if len(hdrs) != 0 {
  101. message.Headers = hdrs
  102. }
  103. }
  104. producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
  105. if err != nil {
  106. printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
  107. }
  108. defer func() {
  109. if err := producer.Close(); err != nil {
  110. logger.Println("Failed to close Kafka producer cleanly:", err)
  111. }
  112. }()
  113. partition, offset, err := producer.SendMessage(message)
  114. if err != nil {
  115. printErrorAndExit(69, "Failed to produce message: %s", err)
  116. } else if !*silent {
  117. fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
  118. }
  119. if *showMetrics {
  120. metrics.WriteOnce(config.MetricRegistry, os.Stderr)
  121. }
  122. }
  123. func printErrorAndExit(code int, format string, values ...interface{}) {
  124. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  125. fmt.Fprintln(os.Stderr)
  126. os.Exit(code)
  127. }
  128. func printUsageErrorAndExit(message string) {
  129. fmt.Fprintln(os.Stderr, "ERROR:", message)
  130. fmt.Fprintln(os.Stderr)
  131. fmt.Fprintln(os.Stderr, "Available command line options:")
  132. flag.PrintDefaults()
  133. os.Exit(64)
  134. }
  135. func stdinAvailable() bool {
  136. stat, _ := os.Stdin.Stat()
  137. return (stat.Mode() & os.ModeCharDevice) == 0
  138. }