functional_producer_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package sarama
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "sync"
  7. "testing"
  8. "time"
  9. toxiproxy "github.com/Shopify/toxiproxy/client"
  10. "github.com/rcrowley/go-metrics"
  11. )
  12. const TestBatchSize = 1000
  13. func TestFuncProducing(t *testing.T) {
  14. config := NewConfig()
  15. testProducingMessages(t, config)
  16. }
  17. func TestFuncProducingGzip(t *testing.T) {
  18. config := NewConfig()
  19. config.Producer.Compression = CompressionGZIP
  20. testProducingMessages(t, config)
  21. }
  22. func TestFuncProducingSnappy(t *testing.T) {
  23. config := NewConfig()
  24. config.Producer.Compression = CompressionSnappy
  25. testProducingMessages(t, config)
  26. }
  27. func TestFuncProducingZstd(t *testing.T) {
  28. config := NewConfig()
  29. config.Version = V2_1_0_0
  30. config.Producer.Compression = CompressionZSTD
  31. testProducingMessages(t, config)
  32. }
  33. func TestFuncProducingNoResponse(t *testing.T) {
  34. config := NewConfig()
  35. config.Producer.RequiredAcks = NoResponse
  36. testProducingMessages(t, config)
  37. }
  38. func TestFuncProducingFlushing(t *testing.T) {
  39. config := NewConfig()
  40. config.Producer.Flush.Messages = TestBatchSize / 8
  41. config.Producer.Flush.Frequency = 250 * time.Millisecond
  42. testProducingMessages(t, config)
  43. }
  44. func TestFuncMultiPartitionProduce(t *testing.T) {
  45. setupFunctionalTest(t)
  46. defer teardownFunctionalTest(t)
  47. config := NewConfig()
  48. config.ChannelBufferSize = 20
  49. config.Producer.Flush.Frequency = 50 * time.Millisecond
  50. config.Producer.Flush.Messages = 200
  51. config.Producer.Return.Successes = true
  52. producer, err := NewSyncProducer(kafkaBrokers, config)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. var wg sync.WaitGroup
  57. wg.Add(TestBatchSize)
  58. for i := 1; i <= TestBatchSize; i++ {
  59. go func(i int) {
  60. defer wg.Done()
  61. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  62. if _, _, err := producer.SendMessage(msg); err != nil {
  63. t.Error(i, err)
  64. }
  65. }(i)
  66. }
  67. wg.Wait()
  68. if err := producer.Close(); err != nil {
  69. t.Error(err)
  70. }
  71. }
  72. func TestFuncProducingToInvalidTopic(t *testing.T) {
  73. setupFunctionalTest(t)
  74. defer teardownFunctionalTest(t)
  75. producer, err := NewSyncProducer(kafkaBrokers, nil)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  80. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  81. }
  82. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  83. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  84. }
  85. safeClose(t, producer)
  86. }
  87. func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
  88. setupFunctionalTest(t)
  89. defer teardownFunctionalTest(t)
  90. config := NewConfig()
  91. config.Producer.Flush.Frequency = 250 * time.Millisecond
  92. config.Producer.Idempotent = true
  93. config.Producer.Timeout = 500 * time.Millisecond
  94. config.Producer.Retry.Max = 1
  95. config.Producer.Retry.Backoff = 500 * time.Millisecond
  96. config.Producer.Return.Successes = true
  97. config.Producer.Return.Errors = true
  98. config.Producer.RequiredAcks = WaitForAll
  99. config.Net.MaxOpenRequests = 1
  100. config.Version = V0_11_0_0
  101. producer, err := NewSyncProducer(kafkaBrokers, config)
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. defer safeClose(t, producer)
  106. // Successfully publish a few messages
  107. for i := 0; i < 10; i++ {
  108. _, _, err = producer.SendMessage(&ProducerMessage{
  109. Topic: "test.1",
  110. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  111. })
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. // break the brokers.
  117. for proxyName, proxy := range Proxies {
  118. if !strings.Contains(proxyName, "kafka") {
  119. continue
  120. }
  121. if err := proxy.Disable(); err != nil {
  122. t.Fatal(err)
  123. }
  124. }
  125. // This should fail hard now
  126. for i := 10; i < 20; i++ {
  127. _, _, err = producer.SendMessage(&ProducerMessage{
  128. Topic: "test.1",
  129. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  130. })
  131. if err == nil {
  132. t.Fatal(err)
  133. }
  134. }
  135. // Now bring the proxy back up
  136. for proxyName, proxy := range Proxies {
  137. if !strings.Contains(proxyName, "kafka") {
  138. continue
  139. }
  140. if err := proxy.Enable(); err != nil {
  141. t.Fatal(err)
  142. }
  143. }
  144. // We should be able to publish again (once everything calms down)
  145. // (otherwise it times out)
  146. for {
  147. _, _, err = producer.SendMessage(&ProducerMessage{
  148. Topic: "test.1",
  149. Value: StringEncoder("comeback message"),
  150. })
  151. if err == nil {
  152. break
  153. }
  154. }
  155. }
  156. func testProducingMessages(t *testing.T, config *Config) {
  157. setupFunctionalTest(t)
  158. defer teardownFunctionalTest(t)
  159. // Configure some latency in order to properly validate the request latency metric
  160. for _, proxy := range Proxies {
  161. if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
  162. t.Fatal("Unable to configure latency toxicity", err)
  163. }
  164. }
  165. config.Producer.Return.Successes = true
  166. config.Consumer.Return.Errors = true
  167. client, err := NewClient(kafkaBrokers, config)
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. // Keep in mind the current offset
  172. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. producer, err := NewAsyncProducerFromClient(client)
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. expectedResponses := TestBatchSize
  181. for i := 1; i <= TestBatchSize; {
  182. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  183. select {
  184. case producer.Input() <- msg:
  185. i++
  186. case ret := <-producer.Errors():
  187. t.Fatal(ret.Err)
  188. case <-producer.Successes():
  189. expectedResponses--
  190. }
  191. }
  192. for expectedResponses > 0 {
  193. select {
  194. case ret := <-producer.Errors():
  195. t.Fatal(ret.Err)
  196. case <-producer.Successes():
  197. expectedResponses--
  198. }
  199. }
  200. safeClose(t, producer)
  201. // Validate producer metrics before using the consumer minus the offset request
  202. validateMetrics(t, client)
  203. master, err := NewConsumerFromClient(client)
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. for i := 1; i <= TestBatchSize; i++ {
  212. select {
  213. case <-time.After(10 * time.Second):
  214. t.Fatal("Not received any more events in the last 10 seconds.")
  215. case err := <-consumer.Errors():
  216. t.Error(err)
  217. case message := <-consumer.Messages():
  218. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  219. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  220. }
  221. }
  222. }
  223. safeClose(t, consumer)
  224. safeClose(t, client)
  225. }
  226. func validateMetrics(t *testing.T, client Client) {
  227. // Get the broker used by test1 topic
  228. var broker *Broker
  229. if partitions, err := client.Partitions("test.1"); err != nil {
  230. t.Error(err)
  231. } else {
  232. for _, partition := range partitions {
  233. if b, err := client.Leader("test.1", partition); err != nil {
  234. t.Error(err)
  235. } else {
  236. if broker != nil && b != broker {
  237. t.Fatal("Expected only one broker, got at least 2")
  238. }
  239. broker = b
  240. }
  241. }
  242. }
  243. metricValidators := newMetricValidators()
  244. noResponse := client.Config().Producer.RequiredAcks == NoResponse
  245. compressionEnabled := client.Config().Producer.Compression != CompressionNone
  246. // We are adding 10ms of latency to all requests with toxiproxy
  247. minRequestLatencyInMs := 10
  248. if noResponse {
  249. // but when we do not wait for a response it can be less than 1ms
  250. minRequestLatencyInMs = 0
  251. }
  252. // We read at least 1 byte from the broker
  253. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
  254. // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
  255. metricValidators.register(minCountMeterValidator("request-rate", 3))
  256. metricValidators.register(minCountHistogramValidator("request-size", 3))
  257. metricValidators.register(minValHistogramValidator("request-size", 1))
  258. // and at least 2 requests to the registered broker (offset + produces)
  259. metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
  260. metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
  261. metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
  262. metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
  263. // We send at least 1 batch
  264. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
  265. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
  266. if compressionEnabled {
  267. // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
  268. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
  269. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
  270. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
  271. } else {
  272. // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
  273. metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
  274. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
  275. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
  276. }
  277. // We send exactly TestBatchSize messages
  278. metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
  279. // We send at least one record per request
  280. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
  281. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
  282. // We receive at least 1 byte from the broker
  283. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
  284. if noResponse {
  285. // in exactly 2 global responses (metadata + offset)
  286. metricValidators.register(countMeterValidator("response-rate", 2))
  287. metricValidators.register(minCountHistogramValidator("response-size", 2))
  288. // and exactly 1 offset response for the registered broker
  289. metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
  290. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
  291. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  292. } else {
  293. // in at least 3 global responses (metadata + offset + produces)
  294. metricValidators.register(minCountMeterValidator("response-rate", 3))
  295. metricValidators.register(minCountHistogramValidator("response-size", 3))
  296. // and at least 2 for the registered broker
  297. metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
  298. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
  299. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  300. }
  301. // There should be no requests in flight anymore
  302. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  303. // Run the validators
  304. metricValidators.run(t, client.Config().MetricRegistry)
  305. }
  306. // Benchmarks
  307. func BenchmarkProducerSmall(b *testing.B) {
  308. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  309. }
  310. func BenchmarkProducerMedium(b *testing.B) {
  311. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  312. }
  313. func BenchmarkProducerLarge(b *testing.B) {
  314. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  315. }
  316. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  317. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  318. }
  319. func BenchmarkProducerMediumSnappy(b *testing.B) {
  320. conf := NewConfig()
  321. conf.Producer.Compression = CompressionSnappy
  322. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  323. }
  324. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  325. setupFunctionalTest(b)
  326. defer teardownFunctionalTest(b)
  327. metricsDisable := os.Getenv("METRICS_DISABLE")
  328. if metricsDisable != "" {
  329. previousUseNilMetrics := metrics.UseNilMetrics
  330. Logger.Println("Disabling metrics using no-op implementation")
  331. metrics.UseNilMetrics = true
  332. // Restore previous setting
  333. defer func() {
  334. metrics.UseNilMetrics = previousUseNilMetrics
  335. }()
  336. }
  337. producer, err := NewAsyncProducer(kafkaBrokers, conf)
  338. if err != nil {
  339. b.Fatal(err)
  340. }
  341. b.ResetTimer()
  342. for i := 1; i <= b.N; {
  343. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  344. select {
  345. case producer.Input() <- msg:
  346. i++
  347. case ret := <-producer.Errors():
  348. b.Fatal(ret.Err)
  349. }
  350. }
  351. safeClose(b, producer)
  352. }