main.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/Shopify/sarama"
  12. metrics "github.com/rcrowley/go-metrics"
  13. )
  14. var (
  15. messageLoad = flag.Int(
  16. "message-load",
  17. 0,
  18. "REQUIRED: The number of messages to produce to -topic.",
  19. )
  20. messageSize = flag.Int(
  21. "message-size",
  22. 0,
  23. "REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
  24. )
  25. brokers = flag.String(
  26. "brokers",
  27. "",
  28. "REQUIRED: A comma separated list of broker addresses.",
  29. )
  30. topic = flag.String(
  31. "topic",
  32. "",
  33. "REQUIRED: The topic to run the performance test on.",
  34. )
  35. partition = flag.Int(
  36. "partition",
  37. -1,
  38. "The partition of -topic to run the performance test on.",
  39. )
  40. throughput = flag.Int(
  41. "throughput",
  42. 0,
  43. "The maximum number of messages to send per second (0 for no limit).",
  44. )
  45. maxMessageBytes = flag.Int(
  46. "max-message-bytes",
  47. 1000000,
  48. "The max permitted size of a message.",
  49. )
  50. requiredAcks = flag.Int(
  51. "required-acks",
  52. 1,
  53. "The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
  54. )
  55. timeout = flag.Duration(
  56. "timeout",
  57. 10*time.Second,
  58. "The duration the producer will wait to receive -required-acks.",
  59. )
  60. partitioner = flag.String(
  61. "partitioner",
  62. "roundrobin",
  63. "The partitioning scheme to use (hash, manual, random, roundrobin).",
  64. )
  65. compression = flag.String(
  66. "compression",
  67. "none",
  68. "The compression method to use (none, gzip, snappy, lz4).",
  69. )
  70. flushFrequency = flag.Duration(
  71. "flush-frequency",
  72. 0,
  73. "The best-effort frequency of flushes.",
  74. )
  75. flushBytes = flag.Int(
  76. "flush-bytes",
  77. 0,
  78. "The best-effort number of bytes needed to trigger a flush.",
  79. )
  80. flushMessages = flag.Int(
  81. "flush-messages",
  82. 0,
  83. "The best-effort number of messages needed to trigger a flush.",
  84. )
  85. flushMaxMessages = flag.Int(
  86. "flush-max-messages",
  87. 0,
  88. "The maximum number of messages the producer will send in a single request.",
  89. )
  90. retryMax = flag.Int(
  91. "retry-max",
  92. 3,
  93. "The total number of times to retry sending a message.",
  94. )
  95. retryBackoff = flag.Duration(
  96. "retry-backoff",
  97. 100*time.Millisecond,
  98. "The duration the producer will wait for the cluster to settle between retries.",
  99. )
  100. clientID = flag.String(
  101. "client-id",
  102. "sarama",
  103. "The client ID sent with every request to the brokers.",
  104. )
  105. channelBufferSize = flag.Int(
  106. "channel-buffer-size",
  107. 256,
  108. "The number of events to buffer in internal and external channels.",
  109. )
  110. version = flag.String(
  111. "version",
  112. "0.8.2.0",
  113. "The assumed version of Kafka.",
  114. )
  115. )
  116. func parseCompression(scheme string) sarama.CompressionCodec {
  117. switch scheme {
  118. case "none":
  119. return sarama.CompressionNone
  120. case "gzip":
  121. return sarama.CompressionGZIP
  122. case "snappy":
  123. return sarama.CompressionSnappy
  124. case "lz4":
  125. return sarama.CompressionLZ4
  126. default:
  127. printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
  128. }
  129. panic("should not happen")
  130. }
  131. func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
  132. if partition < 0 && scheme == "manual" {
  133. printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
  134. }
  135. switch scheme {
  136. case "manual":
  137. return sarama.NewManualPartitioner
  138. case "hash":
  139. return sarama.NewHashPartitioner
  140. case "random":
  141. return sarama.NewRandomPartitioner
  142. case "roundrobin":
  143. return sarama.NewRoundRobinPartitioner
  144. default:
  145. printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
  146. }
  147. panic("should not happen")
  148. }
  149. func parseVersion(version string) sarama.KafkaVersion {
  150. result, err := sarama.ParseKafkaVersion(version)
  151. if err != nil {
  152. printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
  153. }
  154. return result
  155. }
  156. func main() {
  157. flag.Parse()
  158. if *brokers == "" {
  159. printUsageErrorAndExit("-brokers is required")
  160. }
  161. if *topic == "" {
  162. printUsageErrorAndExit("-topic is required")
  163. }
  164. if *messageLoad <= 0 {
  165. printUsageErrorAndExit("-message-load must be greater than 0")
  166. }
  167. if *messageSize <= 0 {
  168. printUsageErrorAndExit("-message-size must be greater than 0")
  169. }
  170. config := sarama.NewConfig()
  171. config.Producer.MaxMessageBytes = *maxMessageBytes
  172. config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
  173. config.Producer.Timeout = *timeout
  174. config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
  175. config.Producer.Compression = parseCompression(*compression)
  176. config.Producer.Flush.Frequency = *flushFrequency
  177. config.Producer.Flush.Bytes = *flushBytes
  178. config.Producer.Flush.Messages = *flushMessages
  179. config.Producer.Flush.MaxMessages = *flushMaxMessages
  180. config.Producer.Return.Successes = true
  181. config.ClientID = *clientID
  182. config.ChannelBufferSize = *channelBufferSize
  183. config.Version = parseVersion(*version)
  184. if err := config.Validate(); err != nil {
  185. printErrorAndExit(69, "Invalid configuration: %s", err)
  186. }
  187. // The async producer provides maximum performance tuning control.
  188. producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
  189. if err != nil {
  190. printErrorAndExit(69, "Failed to create producer: %s", err)
  191. }
  192. defer func() {
  193. if err := producer.Close(); err != nil {
  194. printErrorAndExit(69, "Failed to close producer: %s", err)
  195. }
  196. }()
  197. // Construct -messageLoad messages of appoximately -messageSize random bytes.
  198. messages := make([]*sarama.ProducerMessage, *messageLoad)
  199. for i := 0; i < *messageLoad; i++ {
  200. payload := make([]byte, *messageSize)
  201. if _, err = rand.Read(payload); err != nil {
  202. printErrorAndExit(69, "Failed to generate message payload: %s", err)
  203. }
  204. messages[i] = &sarama.ProducerMessage{
  205. Topic: *topic,
  206. Value: sarama.ByteEncoder(payload),
  207. }
  208. }
  209. // Wait until all messages have been successfully sent (or an error occurs).
  210. messagesDone := make(chan struct{})
  211. go func() {
  212. for i := 0; i < *messageLoad; i++ {
  213. select {
  214. case <-producer.Successes():
  215. case err = <-producer.Errors():
  216. printErrorAndExit(69, "%s", err)
  217. }
  218. }
  219. messagesDone <- struct{}{}
  220. }()
  221. // Print out metrics periodically.
  222. metricsDone := make(chan struct{})
  223. ctx, stopMetrics := context.WithCancel(context.Background())
  224. go func(ctx context.Context) {
  225. t := time.Tick(5 * time.Second)
  226. for {
  227. select {
  228. case <-t:
  229. printMetrics(os.Stdout, config.MetricRegistry)
  230. case <-ctx.Done():
  231. metricsDone <- struct{}{}
  232. return
  233. }
  234. }
  235. }(ctx)
  236. // Produce messages at approximately -throughput messages per second.
  237. if *throughput > 0 {
  238. ticker := time.NewTicker(time.Second)
  239. for _, message := range messages {
  240. for i := 0; i < *throughput; i++ {
  241. producer.Input() <- message
  242. }
  243. <-ticker.C
  244. }
  245. ticker.Stop()
  246. } else {
  247. for _, message := range messages {
  248. producer.Input() <- message
  249. }
  250. }
  251. <-messagesDone
  252. close(messagesDone)
  253. stopMetrics()
  254. <-metricsDone
  255. close(metricsDone)
  256. // Print final metrics.
  257. printMetrics(os.Stdout, config.MetricRegistry)
  258. }
  259. func printMetrics(w io.Writer, r metrics.Registry) {
  260. recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
  261. requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
  262. requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
  263. fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
  264. "%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
  265. "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
  266. recordSendRate.Count(),
  267. recordSendRate.RateMean(),
  268. recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
  269. requestLatency.Mean(),
  270. requestLatency.StdDev(),
  271. requestLatencyPercentiles[0],
  272. requestLatencyPercentiles[1],
  273. requestLatencyPercentiles[2],
  274. requestLatencyPercentiles[3],
  275. requestLatencyPercentiles[4],
  276. )
  277. }
  278. func printUsageErrorAndExit(message string) {
  279. fmt.Fprintln(os.Stderr, "ERROR:", message)
  280. fmt.Fprintln(os.Stderr)
  281. fmt.Fprintln(os.Stderr, "Available command line options:")
  282. flag.PrintDefaults()
  283. os.Exit(64)
  284. }
  285. func printErrorAndExit(code int, format string, values ...interface{}) {
  286. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  287. fmt.Fprintln(os.Stderr)
  288. os.Exit(code)
  289. }