functional_consumer_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package sarama
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  12. setupFunctionalTest(t)
  13. defer teardownFunctionalTest(t)
  14. consumer, err := NewConsumer(kafkaBrokers, nil)
  15. if err != nil {
  16. t.Fatal(err)
  17. }
  18. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  19. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  20. }
  21. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  22. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  23. }
  24. safeClose(t, consumer)
  25. }
  26. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  27. setupFunctionalTest(t)
  28. defer teardownFunctionalTest(t)
  29. p, err := NewSyncProducer(kafkaBrokers, nil)
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. defer safeClose(t, p)
  34. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. c, err := NewConsumer(kafkaBrokers, nil)
  39. if err != nil {
  40. t.Fatal(err)
  41. }
  42. defer safeClose(t, c)
  43. pc, err := c.ConsumePartition("test.1", 0, offset)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. <-pc.Messages()
  48. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  49. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  50. }
  51. safeClose(t, pc)
  52. }
  53. // Makes sure that messages produced by all supported producer version can be
  54. // consumed by all supported consumer versions. It relies on the KAFKA_VERSION
  55. // environment variable to provide the version of the test Kafka cluster.
  56. func TestVersionMatrix(t *testing.T) {
  57. setupFunctionalTest(t)
  58. defer teardownFunctionalTest(t)
  59. // Get the test cluster version from the environment. If there is nothing
  60. // there then assume the highest.
  61. testClusterVersion, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
  62. if err != nil {
  63. testClusterVersion = MaxVersion
  64. }
  65. // Produce lot's of message with all possible combinations of supported
  66. // protocol versions and compressions.
  67. var wg sync.WaitGroup
  68. var producedMessagesMu sync.Mutex
  69. var producedMessages []*ProducerMessage
  70. for _, prodVer := range SupportedVersions {
  71. // Skip versions unsupported by the test cluster.
  72. if !testClusterVersion.IsAtLeast(prodVer) {
  73. continue
  74. }
  75. for _, compression := range []CompressionCodec{
  76. CompressionNone,
  77. CompressionGZIP,
  78. CompressionSnappy,
  79. // FIXME: lz4.Read: invalid header checksum: got 26 expected 130
  80. // CompressionLZ4,
  81. } {
  82. // lz4 compression requires Version >= V0_10_0_0
  83. if compression == CompressionLZ4 && !prodVer.IsAtLeast(V0_10_0_0) {
  84. continue
  85. }
  86. prodCfg := NewConfig()
  87. prodCfg.Version = prodVer
  88. prodCfg.Producer.Return.Successes = true
  89. prodCfg.Producer.Return.Errors = true
  90. prodCfg.Producer.Flush.MaxMessages = 17
  91. prodCfg.Producer.Compression = compression
  92. p, err := NewSyncProducer(kafkaBrokers, prodCfg)
  93. if err != nil {
  94. t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, compression, err)
  95. continue
  96. }
  97. defer safeClose(t, p)
  98. for i := 0; i < 100; i++ {
  99. msg := &ProducerMessage{
  100. Topic: "test.1",
  101. Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, compression, i)),
  102. }
  103. wg.Add(1)
  104. go func() {
  105. defer wg.Done()
  106. _, _, err := p.SendMessage(msg)
  107. if err != nil {
  108. t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
  109. }
  110. producedMessagesMu.Lock()
  111. producedMessages = append(producedMessages, msg)
  112. producedMessagesMu.Unlock()
  113. }()
  114. }
  115. }
  116. }
  117. wg.Wait()
  118. // Sort produced message in ascending offset order.
  119. sort.Slice(producedMessages, func(i, j int) bool {
  120. return producedMessages[i].Offset < producedMessages[j].Offset
  121. })
  122. t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
  123. len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
  124. // Consume all produced messages with all client versions supported by the
  125. // cluster.
  126. consumerVersionLoop:
  127. for _, consVer := range SupportedVersions {
  128. // Skip versions unsupported by the test cluster.
  129. if !testClusterVersion.IsAtLeast(consVer) {
  130. continue
  131. }
  132. t.Logf("*** Consuming with client version %s\n", consVer)
  133. // Create a partition consumer that should start from the first produced
  134. // message.
  135. consCfg := NewConfig()
  136. consCfg.Version = consVer
  137. c, err := NewConsumer(kafkaBrokers, consCfg)
  138. if err != nil {
  139. t.Fatal(err)
  140. }
  141. defer safeClose(t, c)
  142. pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. defer safeClose(t, pc)
  147. // Consume as many messages as there have been produced and make sure that
  148. // order is preserved.
  149. for i, prodMsg := range producedMessages {
  150. select {
  151. case consMsg := <-pc.Messages():
  152. if consMsg.Offset != prodMsg.Offset {
  153. t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
  154. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  155. continue consumerVersionLoop
  156. }
  157. if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
  158. t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
  159. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  160. continue consumerVersionLoop
  161. }
  162. case <-time.After(time.Second):
  163. t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
  164. }
  165. }
  166. }
  167. }
  168. func prodMsg2Str(prodMsg *ProducerMessage) string {
  169. return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
  170. }
  171. func consMsg2Str(consMsg *ConsumerMessage) string {
  172. return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
  173. }