functional_producer_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. //+build functional
  2. package sarama
  3. import (
  4. "fmt"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "testing"
  10. "time"
  11. toxiproxy "github.com/Shopify/toxiproxy/client"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. const TestBatchSize = 1000
  15. func TestFuncProducing(t *testing.T) {
  16. config := NewConfig()
  17. testProducingMessages(t, config)
  18. }
  19. func TestFuncProducingGzip(t *testing.T) {
  20. config := NewConfig()
  21. config.Producer.Compression = CompressionGZIP
  22. testProducingMessages(t, config)
  23. }
  24. func TestFuncProducingSnappy(t *testing.T) {
  25. config := NewConfig()
  26. config.Producer.Compression = CompressionSnappy
  27. testProducingMessages(t, config)
  28. }
  29. func TestFuncProducingZstd(t *testing.T) {
  30. config := NewConfig()
  31. config.Version = V2_1_0_0
  32. config.Producer.Compression = CompressionZSTD
  33. testProducingMessages(t, config)
  34. }
  35. func TestFuncProducingNoResponse(t *testing.T) {
  36. config := NewConfig()
  37. config.Producer.RequiredAcks = NoResponse
  38. testProducingMessages(t, config)
  39. }
  40. func TestFuncProducingFlushing(t *testing.T) {
  41. config := NewConfig()
  42. config.Producer.Flush.Messages = TestBatchSize / 8
  43. config.Producer.Flush.Frequency = 250 * time.Millisecond
  44. testProducingMessages(t, config)
  45. }
  46. func TestFuncMultiPartitionProduce(t *testing.T) {
  47. setupFunctionalTest(t)
  48. defer teardownFunctionalTest(t)
  49. config := NewConfig()
  50. config.ChannelBufferSize = 20
  51. config.Producer.Flush.Frequency = 50 * time.Millisecond
  52. config.Producer.Flush.Messages = 200
  53. config.Producer.Return.Successes = true
  54. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. var wg sync.WaitGroup
  59. wg.Add(TestBatchSize)
  60. for i := 1; i <= TestBatchSize; i++ {
  61. go func(i int) {
  62. defer wg.Done()
  63. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  64. if _, _, err := producer.SendMessage(msg); err != nil {
  65. t.Error(i, err)
  66. }
  67. }(i)
  68. }
  69. wg.Wait()
  70. if err := producer.Close(); err != nil {
  71. t.Error(err)
  72. }
  73. }
  74. func TestFuncProducingToInvalidTopic(t *testing.T) {
  75. setupFunctionalTest(t)
  76. defer teardownFunctionalTest(t)
  77. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  82. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  83. }
  84. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  85. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  86. }
  87. safeClose(t, producer)
  88. }
  89. func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
  90. setupFunctionalTest(t)
  91. defer teardownFunctionalTest(t)
  92. config := NewConfig()
  93. config.Producer.Flush.Frequency = 250 * time.Millisecond
  94. config.Producer.Idempotent = true
  95. config.Producer.Timeout = 500 * time.Millisecond
  96. config.Producer.Retry.Max = 1
  97. config.Producer.Retry.Backoff = 500 * time.Millisecond
  98. config.Producer.Return.Successes = true
  99. config.Producer.Return.Errors = true
  100. config.Producer.RequiredAcks = WaitForAll
  101. config.Net.MaxOpenRequests = 1
  102. config.Version = V0_11_0_0
  103. producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
  104. if err != nil {
  105. t.Fatal(err)
  106. }
  107. defer safeClose(t, producer)
  108. // Successfully publish a few messages
  109. for i := 0; i < 10; i++ {
  110. _, _, err = producer.SendMessage(&ProducerMessage{
  111. Topic: "test.1",
  112. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  113. })
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. }
  118. // break the brokers.
  119. for proxyName, proxy := range FunctionalTestEnv.Proxies {
  120. if !strings.Contains(proxyName, "kafka") {
  121. continue
  122. }
  123. if err := proxy.Disable(); err != nil {
  124. t.Fatal(err)
  125. }
  126. }
  127. // This should fail hard now
  128. for i := 10; i < 20; i++ {
  129. _, _, err = producer.SendMessage(&ProducerMessage{
  130. Topic: "test.1",
  131. Value: StringEncoder(fmt.Sprintf("%d message", i)),
  132. })
  133. if err == nil {
  134. t.Fatal(err)
  135. }
  136. }
  137. // Now bring the proxy back up
  138. for proxyName, proxy := range FunctionalTestEnv.Proxies {
  139. if !strings.Contains(proxyName, "kafka") {
  140. continue
  141. }
  142. if err := proxy.Enable(); err != nil {
  143. t.Fatal(err)
  144. }
  145. }
  146. // We should be able to publish again (once everything calms down)
  147. // (otherwise it times out)
  148. for {
  149. _, _, err = producer.SendMessage(&ProducerMessage{
  150. Topic: "test.1",
  151. Value: StringEncoder("comeback message"),
  152. })
  153. if err == nil {
  154. break
  155. }
  156. }
  157. }
  158. func TestInterceptors(t *testing.T) {
  159. config := NewConfig()
  160. setupFunctionalTest(t)
  161. defer teardownFunctionalTest(t)
  162. config.Producer.Return.Successes = true
  163. config.Consumer.Return.Errors = true
  164. config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}}
  165. config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}}
  166. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
  167. if err != nil {
  168. t.Fatal(err)
  169. }
  170. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. producer, err := NewAsyncProducerFromClient(client)
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. for i := 0; i < 10; i++ {
  179. producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
  180. }
  181. for i := 0; i < 10; i++ {
  182. select {
  183. case msg := <-producer.Errors():
  184. t.Error(msg.Err)
  185. case msg := <-producer.Successes():
  186. v, _ := msg.Value.Encode()
  187. expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100)
  188. if string(v) != expected {
  189. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  190. }
  191. }
  192. }
  193. safeClose(t, producer)
  194. master, err := NewConsumerFromClient(client)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. for i := 0; i < 10; i++ {
  203. select {
  204. case <-time.After(10 * time.Second):
  205. t.Fatal("Not received any more events in the last 10 seconds.")
  206. case err := <-consumer.Errors():
  207. t.Error(err)
  208. case msg := <-consumer.Messages():
  209. prodInteExpectation := strconv.Itoa(i) + strconv.Itoa(i+100)
  210. consInteExpectation := strconv.Itoa(i + 20)
  211. expected := TestMessage + prodInteExpectation + consInteExpectation
  212. v := string(msg.Value)
  213. if v != expected {
  214. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  215. }
  216. }
  217. }
  218. safeClose(t, consumer)
  219. safeClose(t, client)
  220. }
  221. func testProducingMessages(t *testing.T, config *Config) {
  222. setupFunctionalTest(t)
  223. defer teardownFunctionalTest(t)
  224. // Configure some latency in order to properly validate the request latency metric
  225. for _, proxy := range FunctionalTestEnv.Proxies {
  226. if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
  227. t.Fatal("Unable to configure latency toxicity", err)
  228. }
  229. }
  230. config.Producer.Return.Successes = true
  231. config.Consumer.Return.Errors = true
  232. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
  233. if err != nil {
  234. t.Fatal(err)
  235. }
  236. // Keep in mind the current offset
  237. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. producer, err := NewAsyncProducerFromClient(client)
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. expectedResponses := TestBatchSize
  246. for i := 1; i <= TestBatchSize; {
  247. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  248. select {
  249. case producer.Input() <- msg:
  250. i++
  251. case ret := <-producer.Errors():
  252. t.Fatal(ret.Err)
  253. case <-producer.Successes():
  254. expectedResponses--
  255. }
  256. }
  257. for expectedResponses > 0 {
  258. select {
  259. case ret := <-producer.Errors():
  260. t.Fatal(ret.Err)
  261. case <-producer.Successes():
  262. expectedResponses--
  263. }
  264. }
  265. safeClose(t, producer)
  266. // Validate producer metrics before using the consumer minus the offset request
  267. validateMetrics(t, client)
  268. master, err := NewConsumerFromClient(client)
  269. if err != nil {
  270. t.Fatal(err)
  271. }
  272. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  273. if err != nil {
  274. t.Fatal(err)
  275. }
  276. for i := 1; i <= TestBatchSize; i++ {
  277. select {
  278. case <-time.After(10 * time.Second):
  279. t.Fatal("Not received any more events in the last 10 seconds.")
  280. case err := <-consumer.Errors():
  281. t.Error(err)
  282. case message := <-consumer.Messages():
  283. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  284. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  285. }
  286. }
  287. }
  288. safeClose(t, consumer)
  289. safeClose(t, client)
  290. }
  291. func validateMetrics(t *testing.T, client Client) {
  292. // Get the broker used by test1 topic
  293. var broker *Broker
  294. if partitions, err := client.Partitions("test.1"); err != nil {
  295. t.Error(err)
  296. } else {
  297. for _, partition := range partitions {
  298. if b, err := client.Leader("test.1", partition); err != nil {
  299. t.Error(err)
  300. } else {
  301. if broker != nil && b != broker {
  302. t.Fatal("Expected only one broker, got at least 2")
  303. }
  304. broker = b
  305. }
  306. }
  307. }
  308. metricValidators := newMetricValidators()
  309. noResponse := client.Config().Producer.RequiredAcks == NoResponse
  310. compressionEnabled := client.Config().Producer.Compression != CompressionNone
  311. // We are adding 10ms of latency to all requests with toxiproxy
  312. minRequestLatencyInMs := 10
  313. if noResponse {
  314. // but when we do not wait for a response it can be less than 1ms
  315. minRequestLatencyInMs = 0
  316. }
  317. // We read at least 1 byte from the broker
  318. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
  319. // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
  320. metricValidators.register(minCountMeterValidator("request-rate", 3))
  321. metricValidators.register(minCountHistogramValidator("request-size", 3))
  322. metricValidators.register(minValHistogramValidator("request-size", 1))
  323. // and at least 2 requests to the registered broker (offset + produces)
  324. metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
  325. metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
  326. metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
  327. metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
  328. // We send at least 1 batch
  329. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
  330. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
  331. if compressionEnabled {
  332. // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
  333. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
  334. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
  335. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
  336. } else {
  337. // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
  338. metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
  339. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
  340. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
  341. }
  342. // We send exactly TestBatchSize messages
  343. metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
  344. // We send at least one record per request
  345. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
  346. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
  347. // We receive at least 1 byte from the broker
  348. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
  349. if noResponse {
  350. // in exactly 2 global responses (metadata + offset)
  351. metricValidators.register(countMeterValidator("response-rate", 2))
  352. metricValidators.register(minCountHistogramValidator("response-size", 2))
  353. // and exactly 1 offset response for the registered broker
  354. metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
  355. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
  356. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  357. } else {
  358. // in at least 3 global responses (metadata + offset + produces)
  359. metricValidators.register(minCountMeterValidator("response-rate", 3))
  360. metricValidators.register(minCountHistogramValidator("response-size", 3))
  361. // and at least 2 for the registered broker
  362. metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
  363. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
  364. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  365. }
  366. // There should be no requests in flight anymore
  367. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  368. // Run the validators
  369. metricValidators.run(t, client.Config().MetricRegistry)
  370. }
  371. // Benchmarks
  372. func BenchmarkProducerSmall(b *testing.B) {
  373. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  374. }
  375. func BenchmarkProducerMedium(b *testing.B) {
  376. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  377. }
  378. func BenchmarkProducerLarge(b *testing.B) {
  379. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  380. }
  381. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  382. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  383. }
  384. func BenchmarkProducerMediumSnappy(b *testing.B) {
  385. conf := NewConfig()
  386. conf.Producer.Compression = CompressionSnappy
  387. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  388. }
  389. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  390. setupFunctionalTest(b)
  391. defer teardownFunctionalTest(b)
  392. metricsDisable := os.Getenv("METRICS_DISABLE")
  393. if metricsDisable != "" {
  394. previousUseNilMetrics := metrics.UseNilMetrics
  395. Logger.Println("Disabling metrics using no-op implementation")
  396. metrics.UseNilMetrics = true
  397. // Restore previous setting
  398. defer func() {
  399. metrics.UseNilMetrics = previousUseNilMetrics
  400. }()
  401. }
  402. producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf)
  403. if err != nil {
  404. b.Fatal(err)
  405. }
  406. b.ResetTimer()
  407. for i := 1; i <= b.N; {
  408. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  409. select {
  410. case producer.Input() <- msg:
  411. i++
  412. case ret := <-producer.Errors():
  413. b.Fatal(ret.Err)
  414. }
  415. }
  416. safeClose(b, producer)
  417. }