functional_producer_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package sarama
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "testing"
  7. "time"
  8. toxiproxy "github.com/Shopify/toxiproxy/client"
  9. "github.com/rcrowley/go-metrics"
  10. )
  11. const TestBatchSize = 1000
  12. func TestFuncProducing(t *testing.T) {
  13. config := NewConfig()
  14. testProducingMessages(t, config)
  15. }
  16. func TestFuncProducingGzip(t *testing.T) {
  17. config := NewConfig()
  18. config.Producer.Compression = CompressionGZIP
  19. testProducingMessages(t, config)
  20. }
  21. func TestFuncProducingSnappy(t *testing.T) {
  22. config := NewConfig()
  23. config.Producer.Compression = CompressionSnappy
  24. testProducingMessages(t, config)
  25. }
  26. func TestFuncProducingNoResponse(t *testing.T) {
  27. config := NewConfig()
  28. config.Producer.RequiredAcks = NoResponse
  29. testProducingMessages(t, config)
  30. }
  31. func TestFuncProducingFlushing(t *testing.T) {
  32. config := NewConfig()
  33. config.Producer.Flush.Messages = TestBatchSize / 8
  34. config.Producer.Flush.Frequency = 250 * time.Millisecond
  35. testProducingMessages(t, config)
  36. }
  37. func TestFuncMultiPartitionProduce(t *testing.T) {
  38. setupFunctionalTest(t)
  39. defer teardownFunctionalTest(t)
  40. config := NewConfig()
  41. config.ChannelBufferSize = 20
  42. config.Producer.Flush.Frequency = 50 * time.Millisecond
  43. config.Producer.Flush.Messages = 200
  44. config.Producer.Return.Successes = true
  45. producer, err := NewSyncProducer(kafkaBrokers, config)
  46. if err != nil {
  47. t.Fatal(err)
  48. }
  49. var wg sync.WaitGroup
  50. wg.Add(TestBatchSize)
  51. for i := 1; i <= TestBatchSize; i++ {
  52. go func(i int) {
  53. defer wg.Done()
  54. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  55. if _, _, err := producer.SendMessage(msg); err != nil {
  56. t.Error(i, err)
  57. }
  58. }(i)
  59. }
  60. wg.Wait()
  61. if err := producer.Close(); err != nil {
  62. t.Error(err)
  63. }
  64. }
  65. func TestFuncProducingToInvalidTopic(t *testing.T) {
  66. setupFunctionalTest(t)
  67. defer teardownFunctionalTest(t)
  68. producer, err := NewSyncProducer(kafkaBrokers, nil)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  73. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  74. }
  75. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  76. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  77. }
  78. safeClose(t, producer)
  79. }
  80. func testProducingMessages(t *testing.T, config *Config) {
  81. setupFunctionalTest(t)
  82. defer teardownFunctionalTest(t)
  83. // Configure some latency in order to properly validate the request latency metric
  84. for _, proxy := range Proxies {
  85. if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
  86. t.Fatal("Unable to configure latency toxicity", err)
  87. }
  88. }
  89. config.Producer.Return.Successes = true
  90. config.Consumer.Return.Errors = true
  91. client, err := NewClient(kafkaBrokers, config)
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. // Keep in mind the current offset
  96. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. producer, err := NewAsyncProducerFromClient(client)
  101. if err != nil {
  102. t.Fatal(err)
  103. }
  104. expectedResponses := TestBatchSize
  105. for i := 1; i <= TestBatchSize; {
  106. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  107. select {
  108. case producer.Input() <- msg:
  109. i++
  110. case ret := <-producer.Errors():
  111. t.Fatal(ret.Err)
  112. case <-producer.Successes():
  113. expectedResponses--
  114. }
  115. }
  116. for expectedResponses > 0 {
  117. select {
  118. case ret := <-producer.Errors():
  119. t.Fatal(ret.Err)
  120. case <-producer.Successes():
  121. expectedResponses--
  122. }
  123. }
  124. safeClose(t, producer)
  125. // Validate producer metrics before using the consumer minus the offset request
  126. validateMetrics(t, client)
  127. master, err := NewConsumerFromClient(client)
  128. if err != nil {
  129. t.Fatal(err)
  130. }
  131. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  132. if err != nil {
  133. t.Fatal(err)
  134. }
  135. for i := 1; i <= TestBatchSize; i++ {
  136. select {
  137. case <-time.After(10 * time.Second):
  138. t.Fatal("Not received any more events in the last 10 seconds.")
  139. case err := <-consumer.Errors():
  140. t.Error(err)
  141. case message := <-consumer.Messages():
  142. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  143. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  144. }
  145. }
  146. }
  147. safeClose(t, consumer)
  148. safeClose(t, client)
  149. }
  150. func validateMetrics(t *testing.T, client Client) {
  151. // Get the broker used by test1 topic
  152. var broker *Broker
  153. if partitions, err := client.Partitions("test.1"); err != nil {
  154. t.Error(err)
  155. } else {
  156. for _, partition := range partitions {
  157. if b, err := client.Leader("test.1", partition); err != nil {
  158. t.Error(err)
  159. } else {
  160. if broker != nil && b != broker {
  161. t.Fatal("Expected only one broker, got at least 2")
  162. }
  163. broker = b
  164. }
  165. }
  166. }
  167. metricValidators := newMetricValidators()
  168. noResponse := client.Config().Producer.RequiredAcks == NoResponse
  169. compressionEnabled := client.Config().Producer.Compression != CompressionNone
  170. // We are adding 10ms of latency to all requests with toxiproxy
  171. minRequestLatencyInMs := 10
  172. if noResponse {
  173. // but when we do not wait for a response it can be less than 1ms
  174. minRequestLatencyInMs = 0
  175. }
  176. // We read at least 1 byte from the broker
  177. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
  178. // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
  179. metricValidators.register(minCountMeterValidator("request-rate", 3))
  180. metricValidators.register(minCountHistogramValidator("request-size", 3))
  181. metricValidators.register(minValHistogramValidator("request-size", 1))
  182. // and at least 2 requests to the registered broker (offset + produces)
  183. metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
  184. metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
  185. metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
  186. metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
  187. // We send at least 1 batch
  188. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
  189. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
  190. if compressionEnabled {
  191. // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
  192. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
  193. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
  194. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
  195. } else {
  196. // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
  197. metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
  198. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
  199. metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
  200. }
  201. // We send exactly TestBatchSize messages
  202. metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
  203. // We send at least one record per request
  204. metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
  205. metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
  206. // We receive at least 1 byte from the broker
  207. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
  208. if noResponse {
  209. // in exactly 2 global responses (metadata + offset)
  210. metricValidators.register(countMeterValidator("response-rate", 2))
  211. metricValidators.register(minCountHistogramValidator("response-size", 2))
  212. // and exactly 1 offset response for the registered broker
  213. metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
  214. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
  215. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  216. } else {
  217. // in at least 3 global responses (metadata + offset + produces)
  218. metricValidators.register(minCountMeterValidator("response-rate", 3))
  219. metricValidators.register(minCountHistogramValidator("response-size", 3))
  220. // and at least 2 for the registered broker
  221. metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
  222. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
  223. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  224. }
  225. // Run the validators
  226. metricValidators.run(t, client.Config().MetricRegistry)
  227. }
  228. // Benchmarks
  229. func BenchmarkProducerSmall(b *testing.B) {
  230. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  231. }
  232. func BenchmarkProducerMedium(b *testing.B) {
  233. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  234. }
  235. func BenchmarkProducerLarge(b *testing.B) {
  236. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  237. }
  238. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  239. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  240. }
  241. func BenchmarkProducerMediumSnappy(b *testing.B) {
  242. conf := NewConfig()
  243. conf.Producer.Compression = CompressionSnappy
  244. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  245. }
  246. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  247. setupFunctionalTest(b)
  248. defer teardownFunctionalTest(b)
  249. metricsDisable := os.Getenv("METRICS_DISABLE")
  250. if metricsDisable != "" {
  251. previousUseNilMetrics := metrics.UseNilMetrics
  252. Logger.Println("Disabling metrics using no-op implementation")
  253. metrics.UseNilMetrics = true
  254. // Restore previous setting
  255. defer func() {
  256. metrics.UseNilMetrics = previousUseNilMetrics
  257. }()
  258. }
  259. producer, err := NewAsyncProducer(kafkaBrokers, conf)
  260. if err != nil {
  261. b.Fatal(err)
  262. }
  263. b.ResetTimer()
  264. for i := 1; i <= b.N; {
  265. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  266. select {
  267. case producer.Input() <- msg:
  268. i++
  269. case ret := <-producer.Errors():
  270. b.Fatal(ret.Err)
  271. }
  272. }
  273. safeClose(b, producer)
  274. }