functional_producer_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. const TestBatchSize = 1000
  9. func TestFuncProducing(t *testing.T) {
  10. config := NewConfig()
  11. testProducingMessages(t, config)
  12. }
  13. func TestFuncProducingGzip(t *testing.T) {
  14. config := NewConfig()
  15. config.Producer.Compression = CompressionGZIP
  16. testProducingMessages(t, config)
  17. }
  18. func TestFuncProducingSnappy(t *testing.T) {
  19. config := NewConfig()
  20. config.Producer.Compression = CompressionSnappy
  21. testProducingMessages(t, config)
  22. }
  23. func TestFuncProducingNoResponse(t *testing.T) {
  24. config := NewConfig()
  25. config.Producer.RequiredAcks = NoResponse
  26. testProducingMessages(t, config)
  27. }
  28. func TestFuncProducingFlushing(t *testing.T) {
  29. config := NewConfig()
  30. config.Producer.Flush.Messages = TestBatchSize / 8
  31. config.Producer.Flush.Frequency = 250 * time.Millisecond
  32. testProducingMessages(t, config)
  33. }
  34. func TestFuncMultiPartitionProduce(t *testing.T) {
  35. setupFunctionalTest(t)
  36. defer teardownFunctionalTest(t)
  37. config := NewConfig()
  38. config.ChannelBufferSize = 20
  39. config.Producer.Flush.Frequency = 50 * time.Millisecond
  40. config.Producer.Flush.Messages = 200
  41. config.Producer.Return.Successes = true
  42. producer, err := NewSyncProducer(kafkaBrokers, config)
  43. if err != nil {
  44. t.Fatal(err)
  45. }
  46. var wg sync.WaitGroup
  47. wg.Add(TestBatchSize)
  48. for i := 1; i <= TestBatchSize; i++ {
  49. go func(i int) {
  50. defer wg.Done()
  51. msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  52. if _, _, err := producer.SendMessage(msg); err != nil {
  53. t.Error(i, err)
  54. }
  55. }(i)
  56. }
  57. wg.Wait()
  58. if err := producer.Close(); err != nil {
  59. t.Error(err)
  60. }
  61. }
  62. func TestFuncProducingToInvalidTopic(t *testing.T) {
  63. setupFunctionalTest(t)
  64. defer teardownFunctionalTest(t)
  65. producer, err := NewSyncProducer(kafkaBrokers, nil)
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  70. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  71. }
  72. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  73. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  74. }
  75. safeClose(t, producer)
  76. }
  77. func testProducingMessages(t *testing.T, config *Config) {
  78. setupFunctionalTest(t)
  79. defer teardownFunctionalTest(t)
  80. config.Producer.Return.Successes = true
  81. config.Consumer.Return.Errors = true
  82. client, err := NewClient(kafkaBrokers, config)
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. master, err := NewConsumerFromClient(client)
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. consumer, err := master.ConsumePartition("test.1", 0, OffsetNewest)
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. producer, err := NewAsyncProducerFromClient(client)
  95. if err != nil {
  96. t.Fatal(err)
  97. }
  98. expectedResponses := TestBatchSize
  99. for i := 1; i <= TestBatchSize; {
  100. msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  101. select {
  102. case producer.Input() <- msg:
  103. i++
  104. case ret := <-producer.Errors():
  105. t.Fatal(ret.Err)
  106. case <-producer.Successes():
  107. expectedResponses--
  108. }
  109. }
  110. for expectedResponses > 0 {
  111. select {
  112. case ret := <-producer.Errors():
  113. t.Fatal(ret.Err)
  114. case <-producer.Successes():
  115. expectedResponses--
  116. }
  117. }
  118. safeClose(t, producer)
  119. for i := 1; i <= TestBatchSize; i++ {
  120. select {
  121. case <-time.After(10 * time.Second):
  122. t.Fatal("Not received any more events in the last 10 seconds.")
  123. case err := <-consumer.Errors():
  124. t.Error(err)
  125. case message := <-consumer.Messages():
  126. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  127. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  128. }
  129. }
  130. }
  131. safeClose(t, consumer)
  132. safeClose(t, client)
  133. }
  134. // Benchmarks
  135. func BenchmarkProducerSmall(b *testing.B) {
  136. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
  137. }
  138. func BenchmarkProducerMedium(b *testing.B) {
  139. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
  140. }
  141. func BenchmarkProducerLarge(b *testing.B) {
  142. benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
  143. }
  144. func BenchmarkProducerSmallSinglePartition(b *testing.B) {
  145. benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
  146. }
  147. func BenchmarkProducerMediumSnappy(b *testing.B) {
  148. conf := NewConfig()
  149. conf.Producer.Compression = CompressionSnappy
  150. benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
  151. }
  152. func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
  153. setupFunctionalTest(b)
  154. defer teardownFunctionalTest(b)
  155. producer, err := NewAsyncProducer(kafkaBrokers, conf)
  156. if err != nil {
  157. b.Fatal(err)
  158. }
  159. b.ResetTimer()
  160. for i := 1; i <= b.N; {
  161. msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
  162. select {
  163. case producer.Input() <- msg:
  164. i++
  165. case ret := <-producer.Errors():
  166. b.Fatal(ret.Err)
  167. }
  168. }
  169. safeClose(b, producer)
  170. }