functional_consumer_test.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package sarama
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/require"
  11. )
  12. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  13. setupFunctionalTest(t)
  14. defer teardownFunctionalTest(t)
  15. consumer, err := NewConsumer(kafkaBrokers, nil)
  16. if err != nil {
  17. t.Fatal(err)
  18. }
  19. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  20. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  21. }
  22. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  23. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  24. }
  25. safeClose(t, consumer)
  26. }
  27. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  28. setupFunctionalTest(t)
  29. defer teardownFunctionalTest(t)
  30. p, err := NewSyncProducer(kafkaBrokers, nil)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. defer safeClose(t, p)
  35. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. c, err := NewConsumer(kafkaBrokers, nil)
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. defer safeClose(t, c)
  44. pc, err := c.ConsumePartition("test.1", 0, offset)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. <-pc.Messages()
  49. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  50. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  51. }
  52. safeClose(t, pc)
  53. }
  54. // Makes sure that messages produced by all supported client versions/
  55. // compression codecs (except LZ4) combinations can be consumed by all
  56. // supported consumer versions. It relies on the KAFKA_VERSION environment
  57. // variable to provide the version of the test Kafka cluster.
  58. //
  59. // Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
  60. // from this test case. It has a similar version matrix test case below that
  61. // only checks versions from v0.10.0.0 until KAFKA_VERSION.
  62. func TestVersionMatrix(t *testing.T) {
  63. setupFunctionalTest(t)
  64. defer teardownFunctionalTest(t)
  65. // Produce lot's of message with all possible combinations of supported
  66. // protocol versions and compressions for the except of LZ4.
  67. testVersions := versionRange(V0_8_2_0)
  68. allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
  69. producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
  70. // When/Then
  71. consumeMsgs(t, testVersions, producedMessages)
  72. }
  73. // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
  74. // test LZ4 should start with v0.10.0.0.
  75. func TestVersionMatrixLZ4(t *testing.T) {
  76. setupFunctionalTest(t)
  77. defer teardownFunctionalTest(t)
  78. // Produce lot's of message with all possible combinations of supported
  79. // protocol versions starting with v0.10 (first where LZ4 was supported)
  80. // and all possible compressions.
  81. testVersions := versionRange(V0_10_0_0)
  82. allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
  83. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
  84. // When/Then
  85. consumeMsgs(t, testVersions, producedMessages)
  86. }
  87. // Support for zstd codec was introduced in v2.1.0.0
  88. func TestVersionMatrixZstd(t *testing.T) {
  89. setupFunctionalTest(t)
  90. defer teardownFunctionalTest(t)
  91. // Produce lot's of message with all possible combinations of supported
  92. // protocol versions starting with v2.1.0.0 (first where zstd was supported)
  93. testVersions := versionRange(V2_1_0_0)
  94. allCodecs := []CompressionCodec{CompressionZSTD}
  95. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
  96. // When/Then
  97. consumeMsgs(t, testVersions, producedMessages)
  98. }
  99. func TestVersionMatrixIdempotent(t *testing.T) {
  100. setupFunctionalTest(t)
  101. defer teardownFunctionalTest(t)
  102. // Produce lot's of message with all possible combinations of supported
  103. // protocol versions starting with v0.11 (first where idempotent was supported)
  104. testVersions := versionRange(V0_11_0_0)
  105. producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
  106. // When/Then
  107. consumeMsgs(t, testVersions, producedMessages)
  108. }
  109. func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
  110. checkKafkaVersion(t, "0.11.0")
  111. setupFunctionalTest(t)
  112. defer teardownFunctionalTest(t)
  113. config := NewConfig()
  114. config.Consumer.IsolationLevel = ReadCommitted
  115. config.Version = V0_11_0_0
  116. consumer, err := NewConsumer(kafkaBrokers, config)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
  121. require.NoError(t, err)
  122. msgChannel := pc.Messages()
  123. for i := 1; i <= 6; i++ {
  124. msg := <-msgChannel
  125. require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
  126. }
  127. }
  128. func prodMsg2Str(prodMsg *ProducerMessage) string {
  129. return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
  130. }
  131. func consMsg2Str(consMsg *ConsumerMessage) string {
  132. return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
  133. }
  134. func versionRange(lower KafkaVersion) []KafkaVersion {
  135. // Get the test cluster version from the environment. If there is nothing
  136. // there then assume the highest.
  137. upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
  138. if err != nil {
  139. upper = MaxVersion
  140. }
  141. versions := make([]KafkaVersion, 0, len(SupportedVersions))
  142. for _, v := range SupportedVersions {
  143. if !v.IsAtLeast(lower) {
  144. continue
  145. }
  146. if !upper.IsAtLeast(v) {
  147. return versions
  148. }
  149. versions = append(versions, v)
  150. }
  151. return versions
  152. }
  153. func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
  154. var wg sync.WaitGroup
  155. var producedMessagesMu sync.Mutex
  156. var producedMessages []*ProducerMessage
  157. for _, prodVer := range clientVersions {
  158. for _, codec := range codecs {
  159. prodCfg := NewConfig()
  160. prodCfg.Version = prodVer
  161. prodCfg.Producer.Return.Successes = true
  162. prodCfg.Producer.Return.Errors = true
  163. prodCfg.Producer.Flush.MaxMessages = flush
  164. prodCfg.Producer.Compression = codec
  165. prodCfg.Producer.Idempotent = idempotent
  166. if idempotent {
  167. prodCfg.Producer.RequiredAcks = WaitForAll
  168. prodCfg.Net.MaxOpenRequests = 1
  169. }
  170. p, err := NewSyncProducer(kafkaBrokers, prodCfg)
  171. if err != nil {
  172. t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
  173. continue
  174. }
  175. defer safeClose(t, p)
  176. for i := 0; i < countPerVerCodec; i++ {
  177. msg := &ProducerMessage{
  178. Topic: "test.1",
  179. Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
  180. }
  181. wg.Add(1)
  182. go func() {
  183. defer wg.Done()
  184. _, _, err := p.SendMessage(msg)
  185. if err != nil {
  186. t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
  187. }
  188. producedMessagesMu.Lock()
  189. producedMessages = append(producedMessages, msg)
  190. producedMessagesMu.Unlock()
  191. }()
  192. }
  193. }
  194. }
  195. wg.Wait()
  196. // Sort produced message in ascending offset order.
  197. sort.Slice(producedMessages, func(i, j int) bool {
  198. return producedMessages[i].Offset < producedMessages[j].Offset
  199. })
  200. t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
  201. len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
  202. return producedMessages
  203. }
  204. func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
  205. // Consume all produced messages with all client versions supported by the
  206. // cluster.
  207. consumerVersionLoop:
  208. for _, consVer := range clientVersions {
  209. t.Logf("*** Consuming with client version %s\n", consVer)
  210. // Create a partition consumer that should start from the first produced
  211. // message.
  212. consCfg := NewConfig()
  213. consCfg.Version = consVer
  214. c, err := NewConsumer(kafkaBrokers, consCfg)
  215. if err != nil {
  216. t.Fatal(err)
  217. }
  218. defer safeClose(t, c)
  219. pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
  220. if err != nil {
  221. t.Fatal(err)
  222. }
  223. defer safeClose(t, pc)
  224. // Consume as many messages as there have been produced and make sure that
  225. // order is preserved.
  226. for i, prodMsg := range producedMessages {
  227. select {
  228. case consMsg := <-pc.Messages():
  229. if consMsg.Offset != prodMsg.Offset {
  230. t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
  231. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  232. continue consumerVersionLoop
  233. }
  234. if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
  235. t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
  236. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  237. continue consumerVersionLoop
  238. }
  239. case <-time.After(3 * time.Second):
  240. t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
  241. }
  242. }
  243. }
  244. }