main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. gosync "sync"
  11. "time"
  12. "github.com/Shopify/sarama"
  13. metrics "github.com/rcrowley/go-metrics"
  14. )
  15. var (
  16. sync = flag.Bool(
  17. "sync",
  18. false,
  19. "Use a synchronous producer.",
  20. )
  21. messageLoad = flag.Int(
  22. "message-load",
  23. 0,
  24. "REQUIRED: The number of messages to produce to -topic.",
  25. )
  26. messageSize = flag.Int(
  27. "message-size",
  28. 0,
  29. "REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
  30. )
  31. brokers = flag.String(
  32. "brokers",
  33. "",
  34. "REQUIRED: A comma separated list of broker addresses.",
  35. )
  36. topic = flag.String(
  37. "topic",
  38. "",
  39. "REQUIRED: The topic to run the performance test on.",
  40. )
  41. partition = flag.Int(
  42. "partition",
  43. -1,
  44. "The partition of -topic to run the performance test on.",
  45. )
  46. throughput = flag.Int(
  47. "throughput",
  48. 0,
  49. "The maximum number of messages to send per second (0 for no limit).",
  50. )
  51. maxMessageBytes = flag.Int(
  52. "max-message-bytes",
  53. 1000000,
  54. "The max permitted size of a message.",
  55. )
  56. requiredAcks = flag.Int(
  57. "required-acks",
  58. 1,
  59. "The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
  60. )
  61. timeout = flag.Duration(
  62. "timeout",
  63. 10*time.Second,
  64. "The duration the producer will wait to receive -required-acks.",
  65. )
  66. partitioner = flag.String(
  67. "partitioner",
  68. "roundrobin",
  69. "The partitioning scheme to use (hash, manual, random, roundrobin).",
  70. )
  71. compression = flag.String(
  72. "compression",
  73. "none",
  74. "The compression method to use (none, gzip, snappy, lz4).",
  75. )
  76. flushFrequency = flag.Duration(
  77. "flush-frequency",
  78. 0,
  79. "The best-effort frequency of flushes.",
  80. )
  81. flushBytes = flag.Int(
  82. "flush-bytes",
  83. 0,
  84. "The best-effort number of bytes needed to trigger a flush.",
  85. )
  86. flushMessages = flag.Int(
  87. "flush-messages",
  88. 0,
  89. "The best-effort number of messages needed to trigger a flush.",
  90. )
  91. flushMaxMessages = flag.Int(
  92. "flush-max-messages",
  93. 0,
  94. "The maximum number of messages the producer will send in a single request.",
  95. )
  96. retryMax = flag.Int(
  97. "retry-max",
  98. 3,
  99. "The total number of times to retry sending a message.",
  100. )
  101. retryBackoff = flag.Duration(
  102. "retry-backoff",
  103. 100*time.Millisecond,
  104. "The duration the producer will wait for the cluster to settle between retries.",
  105. )
  106. clientID = flag.String(
  107. "client-id",
  108. "sarama",
  109. "The client ID sent with every request to the brokers.",
  110. )
  111. channelBufferSize = flag.Int(
  112. "channel-buffer-size",
  113. 256,
  114. "The number of events to buffer in internal and external channels.",
  115. )
  116. routines = flag.Int(
  117. "routines",
  118. 1,
  119. "The number of routines to send the messages from (-sync only).",
  120. )
  121. version = flag.String(
  122. "version",
  123. "0.8.2.0",
  124. "The assumed version of Kafka.",
  125. )
  126. )
  127. func parseCompression(scheme string) sarama.CompressionCodec {
  128. switch scheme {
  129. case "none":
  130. return sarama.CompressionNone
  131. case "gzip":
  132. return sarama.CompressionGZIP
  133. case "snappy":
  134. return sarama.CompressionSnappy
  135. case "lz4":
  136. return sarama.CompressionLZ4
  137. default:
  138. printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
  139. }
  140. panic("should not happen")
  141. }
  142. func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
  143. if partition < 0 && scheme == "manual" {
  144. printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
  145. }
  146. switch scheme {
  147. case "manual":
  148. return sarama.NewManualPartitioner
  149. case "hash":
  150. return sarama.NewHashPartitioner
  151. case "random":
  152. return sarama.NewRandomPartitioner
  153. case "roundrobin":
  154. return sarama.NewRoundRobinPartitioner
  155. default:
  156. printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
  157. }
  158. panic("should not happen")
  159. }
  160. func parseVersion(version string) sarama.KafkaVersion {
  161. result, err := sarama.ParseKafkaVersion(version)
  162. if err != nil {
  163. printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
  164. }
  165. return result
  166. }
  167. func generateMessages(topic string, partition, messageLoad, messageSize int) []*sarama.ProducerMessage {
  168. messages := make([]*sarama.ProducerMessage, messageLoad)
  169. for i := 0; i < messageLoad; i++ {
  170. payload := make([]byte, messageSize)
  171. if _, err := rand.Read(payload); err != nil {
  172. printErrorAndExit(69, "Failed to generate message payload: %s", err)
  173. }
  174. messages[i] = &sarama.ProducerMessage{
  175. Topic: topic,
  176. Partition: int32(partition),
  177. Value: sarama.ByteEncoder(payload),
  178. }
  179. }
  180. return messages
  181. }
  182. func main() {
  183. flag.Parse()
  184. if *brokers == "" {
  185. printUsageErrorAndExit("-brokers is required")
  186. }
  187. if *topic == "" {
  188. printUsageErrorAndExit("-topic is required")
  189. }
  190. if *messageLoad <= 0 {
  191. printUsageErrorAndExit("-message-load must be greater than 0")
  192. }
  193. if *messageSize <= 0 {
  194. printUsageErrorAndExit("-message-size must be greater than 0")
  195. }
  196. if *routines < 1 || *routines > *messageLoad {
  197. printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
  198. }
  199. config := sarama.NewConfig()
  200. config.Producer.MaxMessageBytes = *maxMessageBytes
  201. config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
  202. config.Producer.Timeout = *timeout
  203. config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
  204. config.Producer.Compression = parseCompression(*compression)
  205. config.Producer.Flush.Frequency = *flushFrequency
  206. config.Producer.Flush.Bytes = *flushBytes
  207. config.Producer.Flush.Messages = *flushMessages
  208. config.Producer.Flush.MaxMessages = *flushMaxMessages
  209. config.Producer.Return.Successes = true
  210. config.ClientID = *clientID
  211. config.ChannelBufferSize = *channelBufferSize
  212. config.Version = parseVersion(*version)
  213. if err := config.Validate(); err != nil {
  214. printErrorAndExit(69, "Invalid configuration: %s", err)
  215. }
  216. // Print out metrics periodically.
  217. done := make(chan struct{})
  218. ctx, cancel := context.WithCancel(context.Background())
  219. go func(ctx context.Context) {
  220. defer close(done)
  221. t := time.Tick(5 * time.Second)
  222. for {
  223. select {
  224. case <-t:
  225. printMetrics(os.Stdout, config.MetricRegistry)
  226. case <-ctx.Done():
  227. return
  228. }
  229. }
  230. }(ctx)
  231. brokers := strings.Split(*brokers, ",")
  232. if *sync {
  233. runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
  234. config, brokers, *throughput)
  235. } else {
  236. runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
  237. config, brokers, *throughput)
  238. }
  239. cancel()
  240. <-done
  241. // Print final metrics.
  242. printMetrics(os.Stdout, config.MetricRegistry)
  243. }
  244. func runAsyncProducer(topic string, partition, messageLoad, messageSize int,
  245. config *sarama.Config, brokers []string, throughput int) {
  246. producer, err := sarama.NewAsyncProducer(brokers, config)
  247. if err != nil {
  248. printErrorAndExit(69, "Failed to create producer: %s", err)
  249. }
  250. defer func() {
  251. if err := producer.Close(); err != nil {
  252. printErrorAndExit(69, "Failed to close producer: %s", err)
  253. }
  254. }()
  255. messages := generateMessages(topic, partition, messageLoad, messageSize)
  256. messagesDone := make(chan struct{})
  257. go func() {
  258. for i := 0; i < messageLoad; i++ {
  259. select {
  260. case <-producer.Successes():
  261. case err = <-producer.Errors():
  262. printErrorAndExit(69, "%s", err)
  263. }
  264. }
  265. messagesDone <- struct{}{}
  266. }()
  267. if throughput > 0 {
  268. ticker := time.NewTicker(time.Second)
  269. for _, message := range messages {
  270. for i := 0; i < throughput; i++ {
  271. producer.Input() <- message
  272. }
  273. <-ticker.C
  274. }
  275. ticker.Stop()
  276. } else {
  277. for _, message := range messages {
  278. producer.Input() <- message
  279. }
  280. }
  281. <-messagesDone
  282. close(messagesDone)
  283. }
  284. func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
  285. config *sarama.Config, brokers []string, throughput int) {
  286. producer, err := sarama.NewSyncProducer(brokers, config)
  287. if err != nil {
  288. printErrorAndExit(69, "Failed to create producer: %s", err)
  289. }
  290. defer func() {
  291. if err := producer.Close(); err != nil {
  292. printErrorAndExit(69, "Failed to close producer: %s", err)
  293. }
  294. }()
  295. messages := make([][]*sarama.ProducerMessage, routines)
  296. for i := 0; i < routines; i++ {
  297. if i == routines-1 {
  298. messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
  299. } else {
  300. messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
  301. }
  302. }
  303. var wg gosync.WaitGroup
  304. if throughput > 0 {
  305. for _, messages := range messages {
  306. messages := messages
  307. wg.Add(1)
  308. go func() {
  309. ticker := time.NewTicker(time.Second)
  310. for _, message := range messages {
  311. for i := 0; i < throughput; i++ {
  312. _, _, err = producer.SendMessage(message)
  313. if err != nil {
  314. printErrorAndExit(69, "Failed to send message: %s", err)
  315. }
  316. }
  317. <-ticker.C
  318. }
  319. ticker.Stop()
  320. wg.Done()
  321. }()
  322. }
  323. } else {
  324. for _, messages := range messages {
  325. messages := messages
  326. wg.Add(1)
  327. go func() {
  328. for _, message := range messages {
  329. _, _, err = producer.SendMessage(message)
  330. if err != nil {
  331. printErrorAndExit(69, "Failed to send message: %s", err)
  332. }
  333. }
  334. wg.Done()
  335. }()
  336. }
  337. }
  338. wg.Wait()
  339. }
  340. func printMetrics(w io.Writer, r metrics.Registry) {
  341. if r.Get("record-send-rate") == nil || r.Get("request-latency-in-ms") == nil {
  342. return
  343. }
  344. recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
  345. requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
  346. requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
  347. fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
  348. "%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
  349. "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
  350. recordSendRate.Count(),
  351. recordSendRate.RateMean(),
  352. recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
  353. requestLatency.Mean(),
  354. requestLatency.StdDev(),
  355. requestLatencyPercentiles[0],
  356. requestLatencyPercentiles[1],
  357. requestLatencyPercentiles[2],
  358. requestLatencyPercentiles[3],
  359. requestLatencyPercentiles[4],
  360. )
  361. }
  362. func printUsageErrorAndExit(message string) {
  363. fmt.Fprintln(os.Stderr, "ERROR:", message)
  364. fmt.Fprintln(os.Stderr)
  365. fmt.Fprintln(os.Stderr, "Available command line options:")
  366. flag.PrintDefaults()
  367. os.Exit(64)
  368. }
  369. func printErrorAndExit(code int, format string, values ...interface{}) {
  370. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  371. fmt.Fprintln(os.Stderr)
  372. os.Exit(code)
  373. }