functional_producer_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. //+build functional
  2. package sarama
  3. import (
  4. "fmt"
  5. "os"
  6. "strings"
  7. "sync"
  8. "testing"
  9. "time"
  10. toxiproxy "github.com/Shopify/toxiproxy/client"
  11. "github.com/rcrowley/go-metrics"
  12. )
  13. const TestBatchSize = 1000
  14. func TestFuncProducing(t *testing.T) {
  15. config := NewConfig()
  16. testProducingMessages(t, config)
  17. }
  18. func TestFuncProducingGzip(t *testing.T) {
  19. config := NewConfig()
  20. config.Producer.Compression = CompressionGZIP
  21. testProducingMessages(t, config)
  22. }
  23. func TestFuncProducingSnappy(t *testing.T) {
  24. config := NewConfig()
  25. config.Producer.Compression = CompressionSnappy
  26. testProducingMessages(t, config)
  27. }
  28. func TestFuncProducingZstd(t *testing.T) {
  29. config := NewConfig()
  30. config.Version = V2_1_0_0
  31. config.Producer.Compression = CompressionZSTD
  32. testProducingMessages(t, config)
  33. }
  34. func TestFuncProducingNoResponse(t *testing.T) {
  35. config := NewConfig()
  36. config.Producer.RequiredAcks = NoResponse
  37. testProducingMessages(t, config)
  38. }
  39. func TestFuncProducingFlushing(t *testing.T) {
  40. config := NewConfig()
  41. config.Producer.Flush.Messages = TestBatchSize / 8
  42. config.Producer.Flush.Frequency = 250 * time.Millisecond
  43. testProducingMessages(t, config)
  44. }
  45. func TestFuncMultiPartitionProduce(t *testing.T) {
  46. setupFunctionalTest(t)
  47. defer teardownFunctionalTest(t)
  48. config := NewConfig()
  49. config.ChannelBufferSize = 20
  50. config.Producer.Flush.Frequency = 50 * time.Millisecond
  51. config.Producer.Flush.Messages = 200
  52. config.Producer.Return.Successes = true
  53. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. var wg sync.WaitGroup
  58. wg.Add(TestBatchSize)
  59. for i := 1; i <= TestBatchSize; i++ {
  60. go func(i int) {
  61. defer wg.Done()
  62. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  63. if _, _, err := producer.SendMessage(msg); err != nil {
  64. t.Error(i, err)
  65. }
  66. }(i)
  67. }
  68. wg.Wait()
  69. if err := producer.Close(); err != nil {
  70. t.Error(err)
  71. }
  72. }
  73. func TestFuncProducingToInvalidTopic(t *testing.T) {
  74. setupFunctionalTest(t)
  75. defer teardownFunctionalTest(t)
  76. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  81. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  82. }
  83. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  84. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  85. }
  86. safeClose(t, producer)
  87. }
  88. func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
  89. setupFunctionalTest(t)
  90. defer teardownFunctionalTest(t)
  91. config := NewConfig()
  92. config.Producer.Flush.Frequency = 250 * time.Millisecond
  93. config.Producer.Idempotent = true
  94. config.Producer.Timeout = 500 * time.Millisecond
  95. config.Producer.Retry.Max = 1
  96. config.Producer.Retry.Backoff = 500 * time.Millisecond
  97. config.Producer.Return.Successes = true
  98. config.Producer.Return.Errors = true
  99. config.Producer.RequiredAcks = WaitForAll
  100. config.Net.MaxOpenRequests = 1
  101. config.Version = V0_11_0_0
  102. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. defer safeClose(t, producer)
  107. // Successfully publish a few messages
  108. for i := 0; i < 10; i++ {
  109. _, _, err = producer.SendMessage(&ProducerMessage{
  110. Topic: "test.1",
  111. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  112. })
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. }
  117. // break the brokers.
  118. for proxyName, proxy := range FunctionalTestEnv.Proxies {
  119. if !strings.Contains(proxyName, "kafka") {
  120. continue
  121. }
  122. if err := proxy.Disable(); err != nil {
  123. t.Fatal(err)
  124. }
  125. }
  126. // This should fail hard now
  127. for i := 10; i < 20; i++ {
  128. _, _, err = producer.SendMessage(&ProducerMessage{
  129. Topic: "test.1",
  130. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  131. })
  132. if err == nil {
  133. t.Fatal(err)
  134. }
  135. }
  136. // Now bring the proxy back up
  137. for proxyName, proxy := range FunctionalTestEnv.Proxies {
  138. if !strings.Contains(proxyName, "kafka") {
  139. continue
  140. }
  141. if err := proxy.Enable(); err != nil {
  142. t.Fatal(err)
  143. }
  144. }
  145. // We should be able to publish again (once everything calms down)
  146. // (otherwise it times out)
  147. for {
  148. _, _, err = producer.SendMessage(&ProducerMessage{
  149. Topic: "test.1",
  150. Value: StringEncoder("comeback message"),
  151. })
  152. if err == nil {
  153. break
  154. }
  155. }
  156. }
  157. func testProducingMessages(t *testing.T, config *Config) {
  158. setupFunctionalTest(t)
  159. defer teardownFunctionalTest(t)
  160. // Configure some latency in order to properly validate the request latency metric
  161. for _, proxy := range FunctionalTestEnv.Proxies {
  162. if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
  163. t.Fatal("Unable to configure latency toxicity", err)
  164. }
  165. }
  166. config.Producer.Return.Successes = true
  167. config.Consumer.Return.Errors = true
  168. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
  169. if err != nil {
  170. t.Fatal(err)
  171. }
  172. // Keep in mind the current offset
  173. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. producer, err := NewAsyncProducerFromClient(client)
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. expectedResponses := TestBatchSize
  182. for i := 1; i <= TestBatchSize; {
  183. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  184. select {
  185. case producer.Input() <- msg:
  186. i++
  187. case ret := <-producer.Errors():
  188. t.Fatal(ret.Err)
  189. case <-producer.Successes():
  190. expectedResponses--
  191. }
  192. }
  193. for expectedResponses > 0 {
  194. select {
  195. case ret := <-producer.Errors():
  196. t.Fatal(ret.Err)
  197. case <-producer.Successes():
  198. expectedResponses--
  199. }
  200. }
  201. safeClose(t, producer)
  202. // Validate producer metrics before using the consumer minus the offset request
  203. validateMetrics(t, client)
  204. master, err := NewConsumerFromClient(client)
  205. if err != nil {
  206. t.Fatal(err)
  207. }
  208. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  209. if err != nil {
  210. t.Fatal(err)
  211. }
  212. for i := 1; i <= TestBatchSize; i++ {
  213. select {
  214. case <-time.After(10 * time.Second):
  215. t.Fatal("Not received any more events in the last 10 seconds.")
  216. case err := <-consumer.Errors():
  217. t.Error(err)
  218. case message := <-consumer.Messages():
  219. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  220. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  221. }
  222. }
  223. }
  224. safeClose(t, consumer)
  225. safeClose(t, client)
  226. }
  227. func validateMetrics(t *testing.T, client Client) {
  228. // Get the broker used by test1 topic
  229. var broker *Broker
  230. if partitions, err := client.Partitions("test.1"); err != nil {
  231. t.Error(err)
  232. } else {
  233. for _, partition := range partitions {
  234. if b, err := client.Leader("test.1", partition); err != nil {
  235. t.Error(err)
  236. } else {
  237. if broker != nil && b != broker {
  238. t.Fatal("Expected only one broker, got at least 2")
  239. }
  240. broker = b
  241. }
  242. }
  243. }
  244. metricValidators := newMetricValidators()
  245. noResponse := client.Config().Producer.RequiredAcks == NoResponse
  246. compressionEnabled := client.Config().Producer.Compression != CompressionNone
  247. // We are adding 10ms of latency to all requests with toxiproxy
  248. minRequestLatencyInMs := 10
  249. if noResponse {
  250. // but when we do not wait for a response it can be less than 1ms
  251. minRequestLatencyInMs = 0
  252. }
  253. // We read at least 1 byte from the broker
  254. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
  255. // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
  256. metricValidators.register(minCountMeterValidator("request-rate", 3))
  257. metricValidators.register(minCountHistogramValidator("request-size", 3))
  258. metricValidators.register(minValHistogramValidator("request-size", 1))
  259. // and at least 2 requests to the registered broker (offset + produces)
  260. metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
  261. metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
  262. metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
  263. metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
  264. // We send at least 1 batch
  265. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
  266. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
  267. if compressionEnabled {
  268. // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
  269. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
  270. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
  271. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
  272. } else {
  273. // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
  274. metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
  275. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
  276. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
  277. }
  278. // We send exactly TestBatchSize messages
  279. metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
  280. // We send at least one record per request
  281. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
  282. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
  283. // We receive at least 1 byte from the broker
  284. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
  285. if noResponse {
  286. // in exactly 2 global responses (metadata + offset)
  287. metricValidators.register(countMeterValidator("response-rate", 2))
  288. metricValidators.register(minCountHistogramValidator("response-size", 2))
  289. // and exactly 1 offset response for the registered broker
  290. metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
  291. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
  292. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  293. } else {
  294. // in at least 3 global responses (metadata + offset + produces)
  295. metricValidators.register(minCountMeterValidator("response-rate", 3))
  296. metricValidators.register(minCountHistogramValidator("response-size", 3))
  297. // and at least 2 for the registered broker
  298. metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
  299. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
  300. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  301. }
  302. // There should be no requests in flight anymore
  303. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  304. // Run the validators
  305. metricValidators.run(t, client.Config().MetricRegistry)
  306. }
  307. // Benchmarks
  308. func BenchmarkProducerSmall(b *testing.B) {
  309. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  310. }
  311. func BenchmarkProducerMedium(b *testing.B) {
  312. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  313. }
  314. func BenchmarkProducerLarge(b *testing.B) {
  315. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  316. }
  317. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  318. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  319. }
  320. func BenchmarkProducerMediumSnappy(b *testing.B) {
  321. conf := NewConfig()
  322. conf.Producer.Compression = CompressionSnappy
  323. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  324. }
  325. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  326. setupFunctionalTest(b)
  327. defer teardownFunctionalTest(b)
  328. metricsDisable := os.Getenv("METRICS_DISABLE")
  329. if metricsDisable != "" {
  330. previousUseNilMetrics := metrics.UseNilMetrics
  331. Logger.Println("Disabling metrics using no-op implementation")
  332. metrics.UseNilMetrics = true
  333. // Restore previous setting
  334. defer func() {
  335. metrics.UseNilMetrics = previousUseNilMetrics
  336. }()
  337. }
  338. producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf)
  339. if err != nil {
  340. b.Fatal(err)
  341. }
  342. b.ResetTimer()
  343. for i := 1; i <= b.N; {
  344. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  345. select {
  346. case producer.Input() <- msg:
  347. i++
  348. case ret := <-producer.Errors():
  349. b.Fatal(ret.Err)
  350. }
  351. }
  352. safeClose(b, producer)
  353. }