main.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package main
  2. import (
  3. "crypto/rand"
  4. "flag"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/Shopify/sarama"
  10. metrics "github.com/rcrowley/go-metrics"
  11. )
  12. var (
  13. messageLoad = flag.Int(
  14. "message-load",
  15. 0,
  16. "REQUIRED: The number of messages to produce to -topic.",
  17. )
  18. messageSize = flag.Int(
  19. "message-size",
  20. 0,
  21. "REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
  22. )
  23. brokers = flag.String(
  24. "brokers",
  25. "",
  26. "REQUIRED: A comma separated list of broker addresses.",
  27. )
  28. topic = flag.String(
  29. "topic",
  30. "",
  31. "REQUIRED: The topic to run the performance test on.",
  32. )
  33. partition = flag.Int(
  34. "partition",
  35. -1,
  36. "The partition of -topic to run the performance test on.",
  37. )
  38. maxMessageBytes = flag.Int(
  39. "max-message-bytes",
  40. 1000000,
  41. "The max permitted size of a message.",
  42. )
  43. requiredAcks = flag.Int(
  44. "required-acks",
  45. 1,
  46. "The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
  47. )
  48. timeout = flag.Duration(
  49. "timeout",
  50. 10*time.Second,
  51. "The duration the producer will wait to receive -required-acks.",
  52. )
  53. partitioner = flag.String(
  54. "partitioner",
  55. "roundrobin",
  56. "The partitioning scheme to use (hash, manual, random, roundrobin).",
  57. )
  58. compression = flag.String(
  59. "compression",
  60. "none",
  61. "The compression method to use (none, gzip, snappy, lz4).",
  62. )
  63. flushFrequency = flag.Duration(
  64. "flush-frequency",
  65. 0,
  66. "The best-effort frequency of flushes.",
  67. )
  68. flushBytes = flag.Int(
  69. "flush-bytes",
  70. 0,
  71. "The best-effort number of bytes needed to trigger a flush.",
  72. )
  73. flushMessages = flag.Int(
  74. "flush-messages",
  75. 0,
  76. "The best-effort number of messages needed to trigger a flush.",
  77. )
  78. flushMaxMessages = flag.Int(
  79. "flush-max-messages",
  80. 0,
  81. "The maximum number of messages the producer will send in a single request.",
  82. )
  83. retryMax = flag.Int(
  84. "retry-max",
  85. 3,
  86. "The total number of times to retry sending a message.",
  87. )
  88. retryBackoff = flag.Duration(
  89. "retry-backoff",
  90. 100*time.Millisecond,
  91. "The duration the producer will wait for the cluster to settle between retries.",
  92. )
  93. clientID = flag.String(
  94. "client-id",
  95. "sarama",
  96. "The client ID sent with every request to the brokers.",
  97. )
  98. channelBufferSize = flag.Int(
  99. "channel-buffer-size",
  100. 256,
  101. "The number of events to buffer in internal and external channels.",
  102. )
  103. version = flag.String(
  104. "version",
  105. "0.8.2.0",
  106. "The assumed version of Kafka.",
  107. )
  108. )
  109. func parseCompression(scheme string) sarama.CompressionCodec {
  110. switch scheme {
  111. case "none":
  112. return sarama.CompressionNone
  113. case "gzip":
  114. return sarama.CompressionGZIP
  115. case "snappy":
  116. return sarama.CompressionSnappy
  117. case "lz4":
  118. return sarama.CompressionLZ4
  119. default:
  120. printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
  121. }
  122. panic("should not happen")
  123. }
  124. func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
  125. if partition < 0 && scheme == "manual" {
  126. printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
  127. }
  128. switch scheme {
  129. case "manual":
  130. return sarama.NewManualPartitioner
  131. case "hash":
  132. return sarama.NewHashPartitioner
  133. case "random":
  134. return sarama.NewRandomPartitioner
  135. case "roundrobin":
  136. return sarama.NewRoundRobinPartitioner
  137. default:
  138. printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
  139. }
  140. panic("should not happen")
  141. }
  142. func parseVersion(version string) sarama.KafkaVersion {
  143. result, err := sarama.ParseKafkaVersion(version)
  144. if err != nil {
  145. printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
  146. }
  147. return result
  148. }
  149. func main() {
  150. flag.Parse()
  151. if *brokers == "" {
  152. printUsageErrorAndExit("-brokers is required")
  153. }
  154. if *topic == "" {
  155. printUsageErrorAndExit("-topic is required")
  156. }
  157. if *messageLoad <= 0 {
  158. printUsageErrorAndExit("-message-load must be greater than 0")
  159. }
  160. if *messageSize <= 0 {
  161. printUsageErrorAndExit("-message-size must be greater than 0")
  162. }
  163. config := sarama.NewConfig()
  164. config.Producer.MaxMessageBytes = *maxMessageBytes
  165. config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
  166. config.Producer.Timeout = *timeout
  167. config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
  168. config.Producer.Compression = parseCompression(*compression)
  169. config.Producer.Flush.Frequency = *flushFrequency
  170. config.Producer.Flush.Bytes = *flushBytes
  171. config.Producer.Flush.Messages = *flushMessages
  172. config.Producer.Flush.MaxMessages = *flushMaxMessages
  173. config.Producer.Return.Successes = true
  174. config.ClientID = *clientID
  175. config.ChannelBufferSize = *channelBufferSize
  176. config.Version = parseVersion(*version)
  177. if err := config.Validate(); err != nil {
  178. printErrorAndExit(69, "Invalid configuration: %s", err)
  179. }
  180. // The async producer provides maximum performance tuning control.
  181. producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
  182. if err != nil {
  183. printErrorAndExit(69, "Failed to create producer: %s", err)
  184. }
  185. defer producer.Close()
  186. // Construct -messageLoad messages of appoximately -messageSize random bytes.
  187. messages := make([]*sarama.ProducerMessage, *messageLoad)
  188. for i := 0; i < *messageLoad; i++ {
  189. payload := make([]byte, *messageSize)
  190. if _, err = rand.Read(payload); err != nil {
  191. printErrorAndExit(69, "Failed to generate message payload: %s", err)
  192. }
  193. messages[i] = &sarama.ProducerMessage{
  194. Topic: *topic,
  195. Value: sarama.ByteEncoder(payload),
  196. }
  197. }
  198. // Wait until all messages have been successfully sent (or an error occurs).
  199. done := make(chan struct{})
  200. go func() {
  201. for i := 0; i < *messageLoad; i++ {
  202. select {
  203. case <-producer.Successes():
  204. case err = <-producer.Errors():
  205. printErrorAndExit(69, "%s", err)
  206. }
  207. }
  208. done <- struct{}{}
  209. }()
  210. // Feed all the messages to the producer at once.
  211. for _, message := range messages {
  212. producer.Input() <- message
  213. }
  214. <-done
  215. close(done)
  216. // TODO: Decide on a better format (or add a flag for options).
  217. metrics.WriteOnce(config.MetricRegistry, os.Stdout)
  218. }
  219. func printUsageErrorAndExit(message string) {
  220. fmt.Fprintln(os.Stderr, "ERROR:", message)
  221. fmt.Fprintln(os.Stderr)
  222. fmt.Fprintln(os.Stderr, "Available command line options:")
  223. flag.PrintDefaults()
  224. os.Exit(64)
  225. }
  226. func printErrorAndExit(code int, format string, values ...interface{}) {
  227. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  228. fmt.Fprintln(os.Stderr)
  229. os.Exit(code)
  230. }