functional_producer_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package sarama
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/rcrowley/go-metrics"
  9. )
  10. const TestBatchSize = 1000
  11. func TestFuncProducing(t *testing.T) {
  12. config := NewConfig()
  13. testProducingMessages(t, config)
  14. }
  15. func TestFuncProducingGzip(t *testing.T) {
  16. config := NewConfig()
  17. config.Producer.Compression = CompressionGZIP
  18. testProducingMessages(t, config)
  19. }
  20. func TestFuncProducingSnappy(t *testing.T) {
  21. config := NewConfig()
  22. config.Producer.Compression = CompressionSnappy
  23. testProducingMessages(t, config)
  24. }
  25. func TestFuncProducingNoResponse(t *testing.T) {
  26. config := NewConfig()
  27. config.Producer.RequiredAcks = NoResponse
  28. testProducingMessages(t, config)
  29. }
  30. func TestFuncProducingFlushing(t *testing.T) {
  31. config := NewConfig()
  32. config.Producer.Flush.Messages = TestBatchSize / 8
  33. config.Producer.Flush.Frequency = 250 * time.Millisecond
  34. testProducingMessages(t, config)
  35. }
  36. func TestFuncMultiPartitionProduce(t *testing.T) {
  37. setupFunctionalTest(t)
  38. defer teardownFunctionalTest(t)
  39. config := NewConfig()
  40. config.ChannelBufferSize = 20
  41. config.Producer.Flush.Frequency = 50 * time.Millisecond
  42. config.Producer.Flush.Messages = 200
  43. config.Producer.Return.Successes = true
  44. producer, err := NewSyncProducer(kafkaBrokers, config)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. var wg sync.WaitGroup
  49. wg.Add(TestBatchSize)
  50. for i := 1; i <= TestBatchSize; i++ {
  51. go func(i int) {
  52. defer wg.Done()
  53. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  54. if _, _, err := producer.SendMessage(msg); err != nil {
  55. t.Error(i, err)
  56. }
  57. }(i)
  58. }
  59. wg.Wait()
  60. if err := producer.Close(); err != nil {
  61. t.Error(err)
  62. }
  63. }
  64. func TestFuncProducingToInvalidTopic(t *testing.T) {
  65. setupFunctionalTest(t)
  66. defer teardownFunctionalTest(t)
  67. producer, err := NewSyncProducer(kafkaBrokers, nil)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  72. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  73. }
  74. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  75. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  76. }
  77. safeClose(t, producer)
  78. }
  79. func testProducingMessages(t *testing.T, config *Config) {
  80. setupFunctionalTest(t)
  81. defer teardownFunctionalTest(t)
  82. config.Producer.Return.Successes = true
  83. config.Consumer.Return.Errors = true
  84. client, err := NewClient(kafkaBrokers, config)
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. // Keep in mind the current offset
  89. initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. producer, err := NewAsyncProducerFromClient(client)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. expectedResponses := TestBatchSize
  98. for i := 1; i <= TestBatchSize; {
  99. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  100. select {
  101. case producer.Input() <- msg:
  102. i++
  103. case ret := <-producer.Errors():
  104. t.Fatal(ret.Err)
  105. case <-producer.Successes():
  106. expectedResponses--
  107. }
  108. }
  109. for expectedResponses > 0 {
  110. select {
  111. case ret := <-producer.Errors():
  112. t.Fatal(ret.Err)
  113. case <-producer.Successes():
  114. expectedResponses--
  115. }
  116. }
  117. safeClose(t, producer)
  118. // Validate producer metrics before using the consumer minus the offset request
  119. validateMetrics(t, client)
  120. master, err := NewConsumerFromClient(client)
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. for i := 1; i <= TestBatchSize; i++ {
  129. select {
  130. case <-time.After(10 * time.Second):
  131. t.Fatal("Not received any more events in the last 10 seconds.")
  132. case err := <-consumer.Errors():
  133. t.Error(err)
  134. case message := <-consumer.Messages():
  135. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  136. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  137. }
  138. }
  139. }
  140. safeClose(t, consumer)
  141. safeClose(t, client)
  142. }
  143. func validateMetrics(t *testing.T, client Client) {
  144. // Get the broker used by test1 topic
  145. var broker *Broker
  146. if partitions, err := client.Partitions("test.1"); err != nil {
  147. t.Error(err)
  148. } else {
  149. for _, partition := range partitions {
  150. if b, err := client.Leader("test.1", partition); err != nil {
  151. t.Error(err)
  152. } else {
  153. if broker != nil && b != broker {
  154. t.Fatal("Expected only one broker, got at least 2")
  155. }
  156. broker = b
  157. }
  158. }
  159. }
  160. metricValidators := newMetricValidators()
  161. noResponse := client.Config().Producer.RequiredAcks == NoResponse
  162. // We read at least 1 byte from the broker
  163. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
  164. // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
  165. metricValidators.register(minCountMeterValidator("request-rate", 3))
  166. metricValidators.register(minCountHistogramValidator("request-size", 3))
  167. metricValidators.register(minValHistogramValidator("request-size", 1))
  168. // and at least 2 requests to the registered broker (offset + produces)
  169. metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
  170. metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
  171. metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
  172. // We receive at least 1 byte from the broker
  173. metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
  174. if noResponse {
  175. // in exactly 2 global responses (metadata + offset)
  176. metricValidators.register(countMeterValidator("response-rate", 2))
  177. metricValidators.register(minCountHistogramValidator("response-size", 2))
  178. metricValidators.register(minValHistogramValidator("response-size", 1))
  179. // and exactly 1 offset response for the registered broker
  180. metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
  181. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
  182. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  183. } else {
  184. // in at least 3 global responses (metadata + offset + produces)
  185. metricValidators.register(minCountMeterValidator("response-rate", 3))
  186. metricValidators.register(minCountHistogramValidator("response-size", 3))
  187. metricValidators.register(minValHistogramValidator("response-size", 1))
  188. // and at least 2 for the registered broker
  189. metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
  190. metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
  191. metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
  192. }
  193. // Run the validators
  194. metricValidators.run(t, client.Config().MetricRegistry)
  195. }
  196. // Benchmarks
  197. func BenchmarkProducerSmall(b *testing.B) {
  198. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  199. }
  200. func BenchmarkProducerMedium(b *testing.B) {
  201. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  202. }
  203. func BenchmarkProducerLarge(b *testing.B) {
  204. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  205. }
  206. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  207. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  208. }
  209. func BenchmarkProducerMediumSnappy(b *testing.B) {
  210. conf := NewConfig()
  211. conf.Producer.Compression = CompressionSnappy
  212. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  213. }
  214. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  215. setupFunctionalTest(b)
  216. defer teardownFunctionalTest(b)
  217. metricsDisable := os.Getenv("METRICS_DISABLE")
  218. if metricsDisable != "" {
  219. previousUseNilMetrics := metrics.UseNilMetrics
  220. Logger.Println("Disabling metrics using no-op implementation")
  221. metrics.UseNilMetrics = true
  222. // Restore previous setting
  223. defer func() {
  224. metrics.UseNilMetrics = previousUseNilMetrics
  225. }()
  226. }
  227. producer, err := NewAsyncProducer(kafkaBrokers, conf)
  228. if err != nil {
  229. b.Fatal(err)
  230. }
  231. b.ResetTimer()
  232. for i := 1; i <= b.N; {
  233. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  234. select {
  235. case producer.Input() <- msg:
  236. i++
  237. case ret := <-producer.Errors():
  238. b.Fatal(ret.Err)
  239. }
  240. }
  241. safeClose(b, producer)
  242. }