main.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "crypto/x509"
  6. "flag"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "log"
  11. "os"
  12. "strings"
  13. gosync "sync"
  14. "time"
  15. metrics "github.com/rcrowley/go-metrics"
  16. "github.com/Shopify/sarama"
  17. "github.com/Shopify/sarama/tools/tls"
  18. )
  19. var (
  20. sync = flag.Bool(
  21. "sync",
  22. false,
  23. "Use a synchronous producer.",
  24. )
  25. messageLoad = flag.Int(
  26. "message-load",
  27. 0,
  28. "REQUIRED: The number of messages to produce to -topic.",
  29. )
  30. messageSize = flag.Int(
  31. "message-size",
  32. 0,
  33. "REQUIRED: The approximate size (in bytes) of each message to produce to -topic.",
  34. )
  35. brokers = flag.String(
  36. "brokers",
  37. "",
  38. "REQUIRED: A comma separated list of broker addresses.",
  39. )
  40. securityProtocol = flag.String(
  41. "security-protocol",
  42. "PLAINTEXT",
  43. "The name of the security protocol to talk to Kafka (PLAINTEXT, SSL) (default: PLAINTEXT).",
  44. )
  45. tlsRootCACerts = flag.String(
  46. "tls-ca-certs",
  47. "",
  48. "The path to a file that contains a set of root certificate authorities in PEM format "+
  49. "to trust when verifying broker certificates when -security-protocol=SSL "+
  50. "(leave empty to use the host's root CA set).",
  51. )
  52. tlsClientCert = flag.String(
  53. "tls-client-cert",
  54. "",
  55. "The path to a file that contains the client certificate to send to the broker "+
  56. "in PEM format if client authentication is required when -security-protocol=SSL "+
  57. "(leave empty to disable client authentication).",
  58. )
  59. tlsClientKey = flag.String(
  60. "tls-client-key",
  61. "",
  62. "The path to a file that contains the client private key linked to the client certificate "+
  63. "in PEM format when -security-protocol=SSL (REQUIRED if tls-client-cert is provided).",
  64. )
  65. topic = flag.String(
  66. "topic",
  67. "",
  68. "REQUIRED: The topic to run the performance test on.",
  69. )
  70. partition = flag.Int(
  71. "partition",
  72. -1,
  73. "The partition of -topic to run the performance test on.",
  74. )
  75. throughput = flag.Int(
  76. "throughput",
  77. 0,
  78. "The maximum number of messages to send per second (0 for no limit).",
  79. )
  80. maxOpenRequests = flag.Int(
  81. "max-open-requests",
  82. 5,
  83. "The maximum number of unacknowledged requests the client will send on a single connection before blocking (default: 5).",
  84. )
  85. maxMessageBytes = flag.Int(
  86. "max-message-bytes",
  87. 1000000,
  88. "The max permitted size of a message.",
  89. )
  90. requiredAcks = flag.Int(
  91. "required-acks",
  92. 1,
  93. "The required number of acks needed from the broker (-1: all, 0: none, 1: local).",
  94. )
  95. timeout = flag.Duration(
  96. "timeout",
  97. 10*time.Second,
  98. "The duration the producer will wait to receive -required-acks.",
  99. )
  100. partitioner = flag.String(
  101. "partitioner",
  102. "roundrobin",
  103. "The partitioning scheme to use (hash, manual, random, roundrobin).",
  104. )
  105. compression = flag.String(
  106. "compression",
  107. "none",
  108. "The compression method to use (none, gzip, snappy, lz4).",
  109. )
  110. flushFrequency = flag.Duration(
  111. "flush-frequency",
  112. 0,
  113. "The best-effort frequency of flushes.",
  114. )
  115. flushBytes = flag.Int(
  116. "flush-bytes",
  117. 0,
  118. "The best-effort number of bytes needed to trigger a flush.",
  119. )
  120. flushMessages = flag.Int(
  121. "flush-messages",
  122. 0,
  123. "The best-effort number of messages needed to trigger a flush.",
  124. )
  125. flushMaxMessages = flag.Int(
  126. "flush-max-messages",
  127. 0,
  128. "The maximum number of messages the producer will send in a single request.",
  129. )
  130. clientID = flag.String(
  131. "client-id",
  132. "sarama",
  133. "The client ID sent with every request to the brokers.",
  134. )
  135. channelBufferSize = flag.Int(
  136. "channel-buffer-size",
  137. 256,
  138. "The number of events to buffer in internal and external channels.",
  139. )
  140. routines = flag.Int(
  141. "routines",
  142. 1,
  143. "The number of routines to send the messages from (-sync only).",
  144. )
  145. version = flag.String(
  146. "version",
  147. "0.8.2.0",
  148. "The assumed version of Kafka.",
  149. )
  150. verbose = flag.Bool(
  151. "verbose",
  152. false,
  153. "Turn on sarama logging to stderr",
  154. )
  155. )
  156. func parseCompression(scheme string) sarama.CompressionCodec {
  157. switch scheme {
  158. case "none":
  159. return sarama.CompressionNone
  160. case "gzip":
  161. return sarama.CompressionGZIP
  162. case "snappy":
  163. return sarama.CompressionSnappy
  164. case "lz4":
  165. return sarama.CompressionLZ4
  166. default:
  167. printUsageErrorAndExit(fmt.Sprintf("Unknown -compression: %s", scheme))
  168. }
  169. panic("should not happen")
  170. }
  171. func parsePartitioner(scheme string, partition int) sarama.PartitionerConstructor {
  172. if partition < 0 && scheme == "manual" {
  173. printUsageErrorAndExit("-partition must not be -1 for -partitioning=manual")
  174. }
  175. switch scheme {
  176. case "manual":
  177. return sarama.NewManualPartitioner
  178. case "hash":
  179. return sarama.NewHashPartitioner
  180. case "random":
  181. return sarama.NewRandomPartitioner
  182. case "roundrobin":
  183. return sarama.NewRoundRobinPartitioner
  184. default:
  185. printUsageErrorAndExit(fmt.Sprintf("Unknown -partitioning: %s", scheme))
  186. }
  187. panic("should not happen")
  188. }
  189. func parseVersion(version string) sarama.KafkaVersion {
  190. result, err := sarama.ParseKafkaVersion(version)
  191. if err != nil {
  192. printUsageErrorAndExit(fmt.Sprintf("unknown -version: %s", version))
  193. }
  194. return result
  195. }
  196. func generateMessages(topic string, partition, messageLoad, messageSize int) []*sarama.ProducerMessage {
  197. messages := make([]*sarama.ProducerMessage, messageLoad)
  198. for i := 0; i < messageLoad; i++ {
  199. payload := make([]byte, messageSize)
  200. if _, err := rand.Read(payload); err != nil {
  201. printErrorAndExit(69, "Failed to generate message payload: %s", err)
  202. }
  203. messages[i] = &sarama.ProducerMessage{
  204. Topic: topic,
  205. Partition: int32(partition),
  206. Value: sarama.ByteEncoder(payload),
  207. }
  208. }
  209. return messages
  210. }
  211. func main() {
  212. flag.Parse()
  213. if *brokers == "" {
  214. printUsageErrorAndExit("-brokers is required")
  215. }
  216. if *topic == "" {
  217. printUsageErrorAndExit("-topic is required")
  218. }
  219. if *messageLoad <= 0 {
  220. printUsageErrorAndExit("-message-load must be greater than 0")
  221. }
  222. if *messageSize <= 0 {
  223. printUsageErrorAndExit("-message-size must be greater than 0")
  224. }
  225. if *routines < 1 || *routines > *messageLoad {
  226. printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
  227. }
  228. if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
  229. printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
  230. }
  231. if *verbose {
  232. sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
  233. }
  234. config := sarama.NewConfig()
  235. config.Net.MaxOpenRequests = *maxOpenRequests
  236. config.Producer.MaxMessageBytes = *maxMessageBytes
  237. config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
  238. config.Producer.Timeout = *timeout
  239. config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
  240. config.Producer.Compression = parseCompression(*compression)
  241. config.Producer.Flush.Frequency = *flushFrequency
  242. config.Producer.Flush.Bytes = *flushBytes
  243. config.Producer.Flush.Messages = *flushMessages
  244. config.Producer.Flush.MaxMessages = *flushMaxMessages
  245. config.Producer.Return.Successes = true
  246. config.ClientID = *clientID
  247. config.ChannelBufferSize = *channelBufferSize
  248. config.Version = parseVersion(*version)
  249. if *securityProtocol == "SSL" {
  250. tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
  251. if err != nil {
  252. printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
  253. *tlsClientCert, *tlsClientKey, err)
  254. }
  255. if *tlsRootCACerts != "" {
  256. rootCAsBytes, err := ioutil.ReadFile(*tlsRootCACerts)
  257. if err != nil {
  258. printErrorAndExit(69, "failed to read root CA certificates: %v", err)
  259. }
  260. certPool := x509.NewCertPool()
  261. if !certPool.AppendCertsFromPEM(rootCAsBytes) {
  262. printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
  263. }
  264. // Use specific root CA set vs the host's set
  265. tlsConfig.RootCAs = certPool
  266. }
  267. config.Net.TLS.Enable = true
  268. config.Net.TLS.Config = tlsConfig
  269. }
  270. if err := config.Validate(); err != nil {
  271. printErrorAndExit(69, "Invalid configuration: %s", err)
  272. }
  273. // Print out metrics periodically.
  274. done := make(chan struct{})
  275. ctx, cancel := context.WithCancel(context.Background())
  276. go func(ctx context.Context) {
  277. defer close(done)
  278. t := time.Tick(5 * time.Second)
  279. for {
  280. select {
  281. case <-t:
  282. printMetrics(os.Stdout, config.MetricRegistry)
  283. case <-ctx.Done():
  284. return
  285. }
  286. }
  287. }(ctx)
  288. brokers := strings.Split(*brokers, ",")
  289. if *sync {
  290. runSyncProducer(*topic, *partition, *messageLoad, *messageSize, *routines,
  291. config, brokers, *throughput)
  292. } else {
  293. runAsyncProducer(*topic, *partition, *messageLoad, *messageSize,
  294. config, brokers, *throughput)
  295. }
  296. cancel()
  297. <-done
  298. // Print final metrics.
  299. printMetrics(os.Stdout, config.MetricRegistry)
  300. }
  301. func runAsyncProducer(topic string, partition, messageLoad, messageSize int,
  302. config *sarama.Config, brokers []string, throughput int) {
  303. producer, err := sarama.NewAsyncProducer(brokers, config)
  304. if err != nil {
  305. printErrorAndExit(69, "Failed to create producer: %s", err)
  306. }
  307. defer func() {
  308. if err := producer.Close(); err != nil {
  309. printErrorAndExit(69, "Failed to close producer: %s", err)
  310. }
  311. }()
  312. messages := generateMessages(topic, partition, messageLoad, messageSize)
  313. messagesDone := make(chan struct{})
  314. go func() {
  315. for i := 0; i < messageLoad; i++ {
  316. select {
  317. case <-producer.Successes():
  318. case err = <-producer.Errors():
  319. printErrorAndExit(69, "%s", err)
  320. }
  321. }
  322. messagesDone <- struct{}{}
  323. }()
  324. if throughput > 0 {
  325. ticker := time.NewTicker(time.Second)
  326. for _, message := range messages {
  327. for i := 0; i < throughput; i++ {
  328. producer.Input() <- message
  329. }
  330. <-ticker.C
  331. }
  332. ticker.Stop()
  333. } else {
  334. for _, message := range messages {
  335. producer.Input() <- message
  336. }
  337. }
  338. <-messagesDone
  339. close(messagesDone)
  340. }
  341. func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
  342. config *sarama.Config, brokers []string, throughput int) {
  343. producer, err := sarama.NewSyncProducer(brokers, config)
  344. if err != nil {
  345. printErrorAndExit(69, "Failed to create producer: %s", err)
  346. }
  347. defer func() {
  348. if err := producer.Close(); err != nil {
  349. printErrorAndExit(69, "Failed to close producer: %s", err)
  350. }
  351. }()
  352. messages := make([][]*sarama.ProducerMessage, routines)
  353. for i := 0; i < routines; i++ {
  354. if i == routines-1 {
  355. messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
  356. } else {
  357. messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
  358. }
  359. }
  360. var wg gosync.WaitGroup
  361. if throughput > 0 {
  362. for _, messages := range messages {
  363. messages := messages
  364. wg.Add(1)
  365. go func() {
  366. ticker := time.NewTicker(time.Second)
  367. for _, message := range messages {
  368. for i := 0; i < throughput; i++ {
  369. _, _, err = producer.SendMessage(message)
  370. if err != nil {
  371. printErrorAndExit(69, "Failed to send message: %s", err)
  372. }
  373. }
  374. <-ticker.C
  375. }
  376. ticker.Stop()
  377. wg.Done()
  378. }()
  379. }
  380. } else {
  381. for _, messages := range messages {
  382. messages := messages
  383. wg.Add(1)
  384. go func() {
  385. for _, message := range messages {
  386. _, _, err = producer.SendMessage(message)
  387. if err != nil {
  388. printErrorAndExit(69, "Failed to send message: %s", err)
  389. }
  390. }
  391. wg.Done()
  392. }()
  393. }
  394. }
  395. wg.Wait()
  396. }
  397. func printMetrics(w io.Writer, r metrics.Registry) {
  398. recordSendRateMetric := r.Get("record-send-rate")
  399. requestLatencyMetric := r.Get("request-latency-in-ms")
  400. outgoingByteRateMetric := r.Get("outgoing-byte-rate")
  401. requestsInFlightMetric := r.Get("requests-in-flight")
  402. if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil ||
  403. requestsInFlightMetric == nil {
  404. return
  405. }
  406. recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
  407. requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
  408. requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
  409. outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
  410. requestsInFlight := requestsInFlightMetric.(metrics.Counter).Count()
  411. fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
  412. "%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
  413. "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th, %d total req. in flight\n",
  414. recordSendRate.Count(),
  415. recordSendRate.RateMean(),
  416. recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
  417. outgoingByteRate.RateMean()/1024/1024,
  418. requestLatency.Mean(),
  419. requestLatency.StdDev(),
  420. requestLatencyPercentiles[0],
  421. requestLatencyPercentiles[1],
  422. requestLatencyPercentiles[2],
  423. requestLatencyPercentiles[3],
  424. requestLatencyPercentiles[4],
  425. requestsInFlight,
  426. )
  427. }
  428. func printUsageErrorAndExit(message string) {
  429. fmt.Fprintln(os.Stderr, "ERROR:", message)
  430. fmt.Fprintln(os.Stderr)
  431. fmt.Fprintln(os.Stderr, "Available command line options:")
  432. flag.PrintDefaults()
  433. os.Exit(64)
  434. }
  435. func printErrorAndExit(code int, format string, values ...interface{}) {
  436. fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
  437. fmt.Fprintln(os.Stderr)
  438. os.Exit(code)
  439. }