functional_consumer_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. //+build functional
  2. package sarama
  3. import (
  4. "fmt"
  5. "math"
  6. "os"
  7. "sort"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/require"
  12. )
  13. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  14. setupFunctionalTest(t)
  15. defer teardownFunctionalTest(t)
  16. consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  21. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  22. }
  23. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  24. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  25. }
  26. safeClose(t, consumer)
  27. }
  28. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  29. setupFunctionalTest(t)
  30. defer teardownFunctionalTest(t)
  31. p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. defer safeClose(t, p)
  36. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  41. if err != nil {
  42. t.Fatal(err)
  43. }
  44. defer safeClose(t, c)
  45. pc, err := c.ConsumePartition("test.1", 0, offset)
  46. if err != nil {
  47. t.Fatal(err)
  48. }
  49. <-pc.Messages()
  50. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  51. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  52. }
  53. safeClose(t, pc)
  54. }
  55. // Makes sure that messages produced by all supported client versions/
  56. // compression codecs (except LZ4) combinations can be consumed by all
  57. // supported consumer versions. It relies on the KAFKA_VERSION environment
  58. // variable to provide the version of the test Kafka cluster.
  59. //
  60. // Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
  61. // from this test case. It has a similar version matrix test case below that
  62. // only checks versions from v0.10.0.0 until KAFKA_VERSION.
  63. func TestVersionMatrix(t *testing.T) {
  64. setupFunctionalTest(t)
  65. defer teardownFunctionalTest(t)
  66. // Produce lot's of message with all possible combinations of supported
  67. // protocol versions and compressions for the except of LZ4.
  68. testVersions := versionRange(V0_8_2_0)
  69. allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
  70. producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
  71. // When/Then
  72. consumeMsgs(t, testVersions, producedMessages)
  73. }
  74. // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
  75. // test LZ4 should start with v0.10.0.0.
  76. func TestVersionMatrixLZ4(t *testing.T) {
  77. setupFunctionalTest(t)
  78. defer teardownFunctionalTest(t)
  79. // Produce lot's of message with all possible combinations of supported
  80. // protocol versions starting with v0.10 (first where LZ4 was supported)
  81. // and all possible compressions.
  82. testVersions := versionRange(V0_10_0_0)
  83. allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
  84. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
  85. // When/Then
  86. consumeMsgs(t, testVersions, producedMessages)
  87. }
  88. // Support for zstd codec was introduced in v2.1.0.0
  89. func TestVersionMatrixZstd(t *testing.T) {
  90. setupFunctionalTest(t)
  91. defer teardownFunctionalTest(t)
  92. // Produce lot's of message with all possible combinations of supported
  93. // protocol versions starting with v2.1.0.0 (first where zstd was supported)
  94. testVersions := versionRange(V2_1_0_0)
  95. allCodecs := []CompressionCodec{CompressionZSTD}
  96. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
  97. // When/Then
  98. consumeMsgs(t, testVersions, producedMessages)
  99. }
  100. func TestVersionMatrixIdempotent(t *testing.T) {
  101. setupFunctionalTest(t)
  102. defer teardownFunctionalTest(t)
  103. // Produce lot's of message with all possible combinations of supported
  104. // protocol versions starting with v0.11 (first where idempotent was supported)
  105. testVersions := versionRange(V0_11_0_0)
  106. producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
  107. // When/Then
  108. consumeMsgs(t, testVersions, producedMessages)
  109. }
  110. func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
  111. checkKafkaVersion(t, "0.11.0")
  112. setupFunctionalTest(t)
  113. defer teardownFunctionalTest(t)
  114. config := NewConfig()
  115. config.Consumer.IsolationLevel = ReadCommitted
  116. config.Version = V0_11_0_0
  117. consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
  122. require.NoError(t, err)
  123. msgChannel := pc.Messages()
  124. for i := 1; i <= 6; i++ {
  125. msg := <-msgChannel
  126. require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
  127. }
  128. }
  129. func prodMsg2Str(prodMsg *ProducerMessage) string {
  130. return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
  131. }
  132. func consMsg2Str(consMsg *ConsumerMessage) string {
  133. return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
  134. }
  135. func versionRange(lower KafkaVersion) []KafkaVersion {
  136. // Get the test cluster version from the environment. If there is nothing
  137. // there then assume the highest.
  138. upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
  139. if err != nil {
  140. upper = MaxVersion
  141. }
  142. versions := make([]KafkaVersion, 0, len(SupportedVersions))
  143. for _, v := range SupportedVersions {
  144. if !v.IsAtLeast(lower) {
  145. continue
  146. }
  147. if !upper.IsAtLeast(v) {
  148. return versions
  149. }
  150. versions = append(versions, v)
  151. }
  152. return versions
  153. }
  154. func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
  155. var wg sync.WaitGroup
  156. var producedMessagesMu sync.Mutex
  157. var producedMessages []*ProducerMessage
  158. for _, prodVer := range clientVersions {
  159. for _, codec := range codecs {
  160. prodCfg := NewConfig()
  161. prodCfg.Version = prodVer
  162. prodCfg.Producer.Return.Successes = true
  163. prodCfg.Producer.Return.Errors = true
  164. prodCfg.Producer.Flush.MaxMessages = flush
  165. prodCfg.Producer.Compression = codec
  166. prodCfg.Producer.Idempotent = idempotent
  167. if idempotent {
  168. prodCfg.Producer.RequiredAcks = WaitForAll
  169. prodCfg.Net.MaxOpenRequests = 1
  170. }
  171. p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
  172. if err != nil {
  173. t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
  174. continue
  175. }
  176. defer safeClose(t, p)
  177. for i := 0; i < countPerVerCodec; i++ {
  178. msg := &ProducerMessage{
  179. Topic: "test.1",
  180. Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
  181. }
  182. wg.Add(1)
  183. go func() {
  184. defer wg.Done()
  185. _, _, err := p.SendMessage(msg)
  186. if err != nil {
  187. t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
  188. }
  189. producedMessagesMu.Lock()
  190. producedMessages = append(producedMessages, msg)
  191. producedMessagesMu.Unlock()
  192. }()
  193. }
  194. }
  195. }
  196. wg.Wait()
  197. // Sort produced message in ascending offset order.
  198. sort.Slice(producedMessages, func(i, j int) bool {
  199. return producedMessages[i].Offset < producedMessages[j].Offset
  200. })
  201. t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
  202. len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
  203. return producedMessages
  204. }
  205. func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
  206. // Consume all produced messages with all client versions supported by the
  207. // cluster.
  208. consumerVersionLoop:
  209. for _, consVer := range clientVersions {
  210. t.Logf("*** Consuming with client version %s\n", consVer)
  211. // Create a partition consumer that should start from the first produced
  212. // message.
  213. consCfg := NewConfig()
  214. consCfg.Version = consVer
  215. c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. defer safeClose(t, c)
  220. pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
  221. if err != nil {
  222. t.Fatal(err)
  223. }
  224. defer safeClose(t, pc)
  225. // Consume as many messages as there have been produced and make sure that
  226. // order is preserved.
  227. for i, prodMsg := range producedMessages {
  228. select {
  229. case consMsg := <-pc.Messages():
  230. if consMsg.Offset != prodMsg.Offset {
  231. t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
  232. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  233. continue consumerVersionLoop
  234. }
  235. if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
  236. t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
  237. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  238. continue consumerVersionLoop
  239. }
  240. case <-time.After(3 * time.Second):
  241. t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
  242. }
  243. }
  244. }
  245. }