main.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/Shopify/sarama"
  12. metrics "github.com/rcrowley/go-metrics"
  13. )
  14. var (
  15. sync = flag.Bool(
  16. "sync",
  17. false,
  18. "Use a synchronous producer.",
  19. )
  20. messageLoad = flag.Int(
  21. "message-load",
  22. 0,
  23. "REQUIRED: The number of messages to produce to -topic.",
  24. )
  25. messageSize = flag.Int(
  26. "message-size",
  27. 0,
  28. "REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
  29. )
  30. brokers = flag.String(
  31. "brokers",
  32. "",
  33. "REQUIRED: A comma separated list of broker addresses.",
  34. )
  35. topic = flag.String(
  36. "topic",
  37. "",
  38. "REQUIRED: The topic to run the performance test on.",
  39. )
  40. partition = flag.Int(
  41. "partition",
  42. -1,
  43. "The partition of -topic to run the performance test on.",
  44. )
  45. throughput = flag.Int(
  46. "throughput",
  47. 0,
  48. "The maximum number of messages to send per second (0 for no limit).",
  49. )
  50. maxMessageBytes = flag.Int(
  51. "max-message-bytes",
  52. 1000000,
  53. "The max permitted size of a message.",
  54. )
  55. requiredAcks = flag.Int(
  56. "required-acks",
  57. 1,
  58. "The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
  59. )
  60. timeout = flag.Duration(
  61. "timeout",
  62. 10*time.Second,
  63. "The duration the producer will wait to receive -required-acks.",
  64. )
  65. partitioner = flag.String(
  66. "partitioner",
  67. "roundrobin",
  68. "The partitioning scheme to use (hash, manual, random, roundrobin).",
  69. )
  70. compression = flag.String(
  71. "compression",
  72. "none",
  73. "The compression method to use (none, gzip, snappy, lz4).",
  74. )
  75. flushFrequency = flag.Duration(
  76. "flush-frequency",
  77. 0,
  78. "The best-effort frequency of flushes.",
  79. )
  80. flushBytes = flag.Int(
  81. "flush-bytes",
  82. 0,
  83. "The best-effort number of bytes needed to trigger a flush.",
  84. )
  85. flushMessages = flag.Int(
  86. "flush-messages",
  87. 0,
  88. "The best-effort number of messages needed to trigger a flush.",
  89. )
  90. flushMaxMessages = flag.Int(
  91. "flush-max-messages",
  92. 0,
  93. "The maximum number of messages the producer will send in a single request.",
  94. )
  95. retryMax = flag.Int(
  96. "retry-max",
  97. 3,
  98. "The total number of times to retry sending a message.",
  99. )
  100. retryBackoff = flag.Duration(
  101. "retry-backoff",
  102. 100*time.Millisecond,
  103. "The duration the producer will wait for the cluster to settle between retries.",
  104. )
  105. clientID = flag.String(
  106. "client-id",
  107. "sarama",
  108. "The client ID sent with every request to the brokers.",
  109. )
  110. channelBufferSize = flag.Int(
  111. "channel-buffer-size",
  112. 256,
  113. "The number of events to buffer in internal and external channels.",
  114. )
  115. version = flag.String(
  116. "version",
  117. "0.8.2.0",
  118. "The assumed version of Kafka.",
  119. )
  120. )
  121. func parseCompression(scheme string) sarama.CompressionCodec {
  122. switch scheme {
  123. case "none":
  124. return sarama.CompressionNone
  125. case "gzip":
  126. return sarama.CompressionGZIP
  127. case "snappy":
  128. return sarama.CompressionSnappy
  129. case "lz4":
  130. return sarama.CompressionLZ4
  131. default:
  132. printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
  133. }
  134. panic("should not happen")
  135. }
  136. func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
  137. if partition < 0 && scheme == "manual" {
  138. printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
  139. }
  140. switch scheme {
  141. case "manual":
  142. return sarama.NewManualPartitioner
  143. case "hash":
  144. return sarama.NewHashPartitioner
  145. case "random":
  146. return sarama.NewRandomPartitioner
  147. case "roundrobin":
  148. return sarama.NewRoundRobinPartitioner
  149. default:
  150. printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
  151. }
  152. panic("should not happen")
  153. }
  154. func parseVersion(version string) sarama.KafkaVersion {
  155. result, err := sarama.ParseKafkaVersion(version)
  156. if err != nil {
  157. printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
  158. }
  159. return result
  160. }
  161. func main() {
  162. flag.Parse()
  163. if *brokers == "" {
  164. printUsageErrorAndExit("-brokers is required")
  165. }
  166. if *topic == "" {
  167. printUsageErrorAndExit("-topic is required")
  168. }
  169. if *messageLoad <= 0 {
  170. printUsageErrorAndExit("-message-load must be greater than 0")
  171. }
  172. if *messageSize <= 0 {
  173. printUsageErrorAndExit("-message-size must be greater than 0")
  174. }
  175. config := sarama.NewConfig()
  176. config.Producer.MaxMessageBytes = *maxMessageBytes
  177. config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
  178. config.Producer.Timeout = *timeout
  179. config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
  180. config.Producer.Compression = parseCompression(*compression)
  181. config.Producer.Flush.Frequency = *flushFrequency
  182. config.Producer.Flush.Bytes = *flushBytes
  183. config.Producer.Flush.Messages = *flushMessages
  184. config.Producer.Flush.MaxMessages = *flushMaxMessages
  185. config.Producer.Return.Successes = true
  186. config.ClientID = *clientID
  187. config.ChannelBufferSize = *channelBufferSize
  188. config.Version = parseVersion(*version)
  189. if err := config.Validate(); err != nil {
  190. printErrorAndExit(69, "Invalid configuration: %s", err)
  191. }
  192. // Construct -messageLoad messages of appoximately -messageSize random bytes.
  193. messages := make([]*sarama.ProducerMessage, *messageLoad)
  194. for i := 0; i < *messageLoad; i++ {
  195. payload := make([]byte, *messageSize)
  196. if _, err := rand.Read(payload); err != nil {
  197. printErrorAndExit(69, "Failed to generate message payload: %s", err)
  198. }
  199. messages[i] = &sarama.ProducerMessage{
  200. Topic: *topic,
  201. Partition: int32(*partition),
  202. Value: sarama.ByteEncoder(payload),
  203. }
  204. }
  205. // Print out metrics periodically.
  206. metricsDone := make(chan struct{})
  207. ctx, stopMetrics := context.WithCancel(context.Background())
  208. go func(ctx context.Context) {
  209. t := time.Tick(5 * time.Second)
  210. for {
  211. select {
  212. case <-t:
  213. printMetrics(os.Stdout, config.MetricRegistry)
  214. case <-ctx.Done():
  215. metricsDone <- struct{}{}
  216. return
  217. }
  218. }
  219. }(ctx)
  220. brokers := strings.Split(*brokers, ",")
  221. if *sync {
  222. runSyncProducer(config, brokers, messages, *throughput)
  223. } else {
  224. runAsyncProducer(config, brokers, messages, *throughput)
  225. }
  226. stopMetrics()
  227. <-metricsDone
  228. close(metricsDone)
  229. // Print final metrics.
  230. printMetrics(os.Stdout, config.MetricRegistry)
  231. }
  232. func runAsyncProducer(config *sarama.Config, brokers []string,
  233. messages []*sarama.ProducerMessage, throughput int) {
  234. producer, err := sarama.NewAsyncProducer(brokers, config)
  235. if err != nil {
  236. printErrorAndExit(69, "Failed to create producer: %s", err)
  237. }
  238. defer func() {
  239. if err := producer.Close(); err != nil {
  240. printErrorAndExit(69, "Failed to close producer: %s", err)
  241. }
  242. }()
  243. messagesDone := make(chan struct{})
  244. go func() {
  245. for i := 0; i < *messageLoad; i++ {
  246. select {
  247. case <-producer.Successes():
  248. case err = <-producer.Errors():
  249. printErrorAndExit(69, "%s", err)
  250. }
  251. }
  252. messagesDone <- struct{}{}
  253. }()
  254. if throughput > 0 {
  255. ticker := time.NewTicker(time.Second)
  256. for _, message := range messages {
  257. for i := 0; i < throughput; i++ {
  258. producer.Input() <- message
  259. }
  260. <-ticker.C
  261. }
  262. ticker.Stop()
  263. } else {
  264. for _, message := range messages {
  265. producer.Input() <- message
  266. }
  267. }
  268. <-messagesDone
  269. close(messagesDone)
  270. }
  271. func runSyncProducer(config *sarama.Config, brokers []string,
  272. messages []*sarama.ProducerMessage, throughput int) {
  273. producer, err := sarama.NewSyncProducer(brokers, config)
  274. if err != nil {
  275. printErrorAndExit(69, "Failed to create producer: %s", err)
  276. }
  277. defer func() {
  278. if err := producer.Close(); err != nil {
  279. printErrorAndExit(69, "Failed to close producer: %s", err)
  280. }
  281. }()
  282. if throughput > 0 {
  283. ticker := time.NewTicker(time.Second)
  284. for _, message := range messages {
  285. for i := 0; i < throughput; i++ {
  286. _, _, err = producer.SendMessage(message)
  287. if err != nil {
  288. printErrorAndExit(69, "Failed to send message: %s", err)
  289. }
  290. }
  291. <-ticker.C
  292. }
  293. ticker.Stop()
  294. } else {
  295. for _, message := range messages {
  296. _, _, err = producer.SendMessage(message)
  297. if err != nil {
  298. printErrorAndExit(69, "Failed to send message: %s", err)
  299. }
  300. }
  301. }
  302. }
  303. func printMetrics(w io.Writer, r metrics.Registry) {
  304. recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
  305. requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
  306. requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
  307. fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
  308. "%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
  309. "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
  310. recordSendRate.Count(),
  311. recordSendRate.RateMean(),
  312. recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
  313. requestLatency.Mean(),
  314. requestLatency.StdDev(),
  315. requestLatencyPercentiles[0],
  316. requestLatencyPercentiles[1],
  317. requestLatencyPercentiles[2],
  318. requestLatencyPercentiles[3],
  319. requestLatencyPercentiles[4],
  320. )
  321. }
  322. func printUsageErrorAndExit(message string) {
  323. fmt.Fprintln(os.Stderr, "ERROR:", message)
  324. fmt.Fprintln(os.Stderr)
  325. fmt.Fprintln(os.Stderr, "Available command line options:")
  326. flag.PrintDefaults()
  327. os.Exit(64)
  328. }
  329. func printErrorAndExit(code int, format string, values ...interface{}) {
  330. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  331. fmt.Fprintln(os.Stderr)
  332. os.Exit(code)
  333. }