functional_consumer_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package sarama
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  12. setupFunctionalTest(t)
  13. defer teardownFunctionalTest(t)
  14. consumer, err := NewConsumer(kafkaBrokers, nil)
  15. if err != nil {
  16. t.Fatal(err)
  17. }
  18. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  19. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  20. }
  21. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  22. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  23. }
  24. safeClose(t, consumer)
  25. }
  26. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  27. setupFunctionalTest(t)
  28. defer teardownFunctionalTest(t)
  29. p, err := NewSyncProducer(kafkaBrokers, nil)
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. defer safeClose(t, p)
  34. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. c, err := NewConsumer(kafkaBrokers, nil)
  39. if err != nil {
  40. t.Fatal(err)
  41. }
  42. defer safeClose(t, c)
  43. pc, err := c.ConsumePartition("test.1", 0, offset)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. <-pc.Messages()
  48. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  49. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  50. }
  51. safeClose(t, pc)
  52. }
  53. // Makes sure that messages produced by all supported client versions/
  54. // compression codecs (except LZ4) combinations can be consumed by all
  55. // supported consumer versions. It relies on the KAFKA_VERSION environment
  56. // variable to provide the version of the test Kafka cluster.
  57. //
  58. // Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
  59. // from this test case. It has a similar version matrix test case below that
  60. // only checks versions from v0.10.0.0 until KAFKA_VERSION.
  61. func TestVersionMatrix(t *testing.T) {
  62. setupFunctionalTest(t)
  63. defer teardownFunctionalTest(t)
  64. // Produce lot's of message with all possible combinations of supported
  65. // protocol versions and compressions for the except of LZ4.
  66. testVersions := versionRange(V0_8_2_0)
  67. allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
  68. producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100)
  69. // When/Then
  70. consumeMsgs(t, testVersions, producedMessages)
  71. }
  72. // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
  73. // test LZ4 should start with v0.10.0.0.
  74. func TestVersionMatrixLZ4(t *testing.T) {
  75. setupFunctionalTest(t)
  76. defer teardownFunctionalTest(t)
  77. // Produce lot's of message with all possible combinations of supported
  78. // protocol versions starting with v0.10 (first where LZ4 was supported)
  79. // and all possible compressions.
  80. testVersions := versionRange(V0_10_0_0)
  81. allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
  82. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100)
  83. // When/Then
  84. consumeMsgs(t, testVersions, producedMessages)
  85. }
  86. func prodMsg2Str(prodMsg *ProducerMessage) string {
  87. return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
  88. }
  89. func consMsg2Str(consMsg *ConsumerMessage) string {
  90. return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
  91. }
  92. func versionRange(lower KafkaVersion) []KafkaVersion {
  93. // Get the test cluster version from the environment. If there is nothing
  94. // there then assume the highest.
  95. upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
  96. if err != nil {
  97. upper = MaxVersion
  98. }
  99. versions := make([]KafkaVersion, 0, len(SupportedVersions))
  100. for _, v := range SupportedVersions {
  101. if !v.IsAtLeast(lower) {
  102. continue
  103. }
  104. if !upper.IsAtLeast(v) {
  105. return versions
  106. }
  107. versions = append(versions, v)
  108. }
  109. return versions
  110. }
  111. func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage {
  112. var wg sync.WaitGroup
  113. var producedMessagesMu sync.Mutex
  114. var producedMessages []*ProducerMessage
  115. for _, prodVer := range clientVersions {
  116. for _, codec := range codecs {
  117. prodCfg := NewConfig()
  118. prodCfg.Version = prodVer
  119. prodCfg.Producer.Return.Successes = true
  120. prodCfg.Producer.Return.Errors = true
  121. prodCfg.Producer.Flush.MaxMessages = flush
  122. prodCfg.Producer.Compression = codec
  123. p, err := NewSyncProducer(kafkaBrokers, prodCfg)
  124. if err != nil {
  125. t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
  126. continue
  127. }
  128. defer safeClose(t, p)
  129. for i := 0; i < countPerVerCodec; i++ {
  130. msg := &ProducerMessage{
  131. Topic: "test.1",
  132. Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
  133. }
  134. wg.Add(1)
  135. go func() {
  136. defer wg.Done()
  137. _, _, err := p.SendMessage(msg)
  138. if err != nil {
  139. t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
  140. }
  141. producedMessagesMu.Lock()
  142. producedMessages = append(producedMessages, msg)
  143. producedMessagesMu.Unlock()
  144. }()
  145. }
  146. }
  147. }
  148. wg.Wait()
  149. // Sort produced message in ascending offset order.
  150. sort.Slice(producedMessages, func(i, j int) bool {
  151. return producedMessages[i].Offset < producedMessages[j].Offset
  152. })
  153. t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
  154. len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
  155. return producedMessages
  156. }
  157. func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
  158. // Consume all produced messages with all client versions supported by the
  159. // cluster.
  160. consumerVersionLoop:
  161. for _, consVer := range clientVersions {
  162. t.Logf("*** Consuming with client version %s\n", consVer)
  163. // Create a partition consumer that should start from the first produced
  164. // message.
  165. consCfg := NewConfig()
  166. consCfg.Version = consVer
  167. c, err := NewConsumer(kafkaBrokers, consCfg)
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. defer safeClose(t, c)
  172. pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. defer safeClose(t, pc)
  177. // Consume as many messages as there have been produced and make sure that
  178. // order is preserved.
  179. for i, prodMsg := range producedMessages {
  180. select {
  181. case consMsg := <-pc.Messages():
  182. if consMsg.Offset != prodMsg.Offset {
  183. t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
  184. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  185. continue consumerVersionLoop
  186. }
  187. if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
  188. t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
  189. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  190. continue consumerVersionLoop
  191. }
  192. case <-time.After(3 * time.Second):
  193. t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
  194. }
  195. }
  196. }
  197. }