functional_consumer_test.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package sarama
  2. import (
  3. "fmt"
  4. "math"
  5. "os"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/stretchr/testify/require"
  11. )
  12. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  13. setupFunctionalTest(t)
  14. defer teardownFunctionalTest(t)
  15. consumer, err := NewConsumer(kafkaBrokers, nil)
  16. if err != nil {
  17. t.Fatal(err)
  18. }
  19. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  20. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  21. }
  22. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  23. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  24. }
  25. safeClose(t, consumer)
  26. }
  27. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  28. setupFunctionalTest(t)
  29. defer teardownFunctionalTest(t)
  30. p, err := NewSyncProducer(kafkaBrokers, nil)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. defer safeClose(t, p)
  35. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. c, err := NewConsumer(kafkaBrokers, nil)
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. defer safeClose(t, c)
  44. pc, err := c.ConsumePartition("test.1", 0, offset)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. <-pc.Messages()
  49. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  50. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  51. }
  52. safeClose(t, pc)
  53. }
  54. // Makes sure that messages produced by all supported client versions/
  55. // compression codecs (except LZ4) combinations can be consumed by all
  56. // supported consumer versions. It relies on the KAFKA_VERSION environment
  57. // variable to provide the version of the test Kafka cluster.
  58. //
  59. // Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
  60. // from this test case. It has a similar version matrix test case below that
  61. // only checks versions from v0.10.0.0 until KAFKA_VERSION.
  62. func TestVersionMatrix(t *testing.T) {
  63. setupFunctionalTest(t)
  64. defer teardownFunctionalTest(t)
  65. // Produce lot's of message with all possible combinations of supported
  66. // protocol versions and compressions for the except of LZ4.
  67. testVersions := versionRange(V0_8_2_0)
  68. allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
  69. producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
  70. // When/Then
  71. consumeMsgs(t, testVersions, producedMessages)
  72. }
  73. // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
  74. // test LZ4 should start with v0.10.0.0.
  75. func TestVersionMatrixLZ4(t *testing.T) {
  76. setupFunctionalTest(t)
  77. defer teardownFunctionalTest(t)
  78. // Produce lot's of message with all possible combinations of supported
  79. // protocol versions starting with v0.10 (first where LZ4 was supported)
  80. // and all possible compressions.
  81. testVersions := versionRange(V0_10_0_0)
  82. allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
  83. producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
  84. // When/Then
  85. consumeMsgs(t, testVersions, producedMessages)
  86. }
  87. func TestVersionMatrixIdempotent(t *testing.T) {
  88. setupFunctionalTest(t)
  89. defer teardownFunctionalTest(t)
  90. // Produce lot's of message with all possible combinations of supported
  91. // protocol versions starting with v0.11 (first where idempotent was supported)
  92. testVersions := versionRange(V0_11_0_0)
  93. producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
  94. // When/Then
  95. consumeMsgs(t, testVersions, producedMessages)
  96. }
  97. func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
  98. checkKafkaVersion(t, "0.11.0")
  99. setupFunctionalTest(t)
  100. defer teardownFunctionalTest(t)
  101. config := NewConfig()
  102. config.Consumer.IsolationLevel = ReadCommitted
  103. config.Version = V0_11_0_0
  104. consumer, err := NewConsumer(kafkaBrokers, config)
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
  109. require.NoError(t, err)
  110. msgChannel := pc.Messages()
  111. for i := 1; i <= 6; i++ {
  112. msg := <-msgChannel
  113. require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
  114. }
  115. }
  116. func prodMsg2Str(prodMsg *ProducerMessage) string {
  117. return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
  118. }
  119. func consMsg2Str(consMsg *ConsumerMessage) string {
  120. return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
  121. }
  122. func versionRange(lower KafkaVersion) []KafkaVersion {
  123. // Get the test cluster version from the environment. If there is nothing
  124. // there then assume the highest.
  125. upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
  126. if err != nil {
  127. upper = MaxVersion
  128. }
  129. versions := make([]KafkaVersion, 0, len(SupportedVersions))
  130. for _, v := range SupportedVersions {
  131. if !v.IsAtLeast(lower) {
  132. continue
  133. }
  134. if !upper.IsAtLeast(v) {
  135. return versions
  136. }
  137. versions = append(versions, v)
  138. }
  139. return versions
  140. }
  141. func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
  142. var wg sync.WaitGroup
  143. var producedMessagesMu sync.Mutex
  144. var producedMessages []*ProducerMessage
  145. for _, prodVer := range clientVersions {
  146. for _, codec := range codecs {
  147. prodCfg := NewConfig()
  148. prodCfg.Version = prodVer
  149. prodCfg.Producer.Return.Successes = true
  150. prodCfg.Producer.Return.Errors = true
  151. prodCfg.Producer.Flush.MaxMessages = flush
  152. prodCfg.Producer.Compression = codec
  153. prodCfg.Producer.Idempotent = idempotent
  154. if idempotent {
  155. prodCfg.Producer.RequiredAcks = WaitForAll
  156. prodCfg.Net.MaxOpenRequests = 1
  157. }
  158. p, err := NewSyncProducer(kafkaBrokers, prodCfg)
  159. if err != nil {
  160. t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
  161. continue
  162. }
  163. defer safeClose(t, p)
  164. for i := 0; i < countPerVerCodec; i++ {
  165. msg := &ProducerMessage{
  166. Topic: "test.1",
  167. Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
  168. }
  169. wg.Add(1)
  170. go func() {
  171. defer wg.Done()
  172. _, _, err := p.SendMessage(msg)
  173. if err != nil {
  174. t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
  175. }
  176. producedMessagesMu.Lock()
  177. producedMessages = append(producedMessages, msg)
  178. producedMessagesMu.Unlock()
  179. }()
  180. }
  181. }
  182. }
  183. wg.Wait()
  184. // Sort produced message in ascending offset order.
  185. sort.Slice(producedMessages, func(i, j int) bool {
  186. return producedMessages[i].Offset < producedMessages[j].Offset
  187. })
  188. t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
  189. len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
  190. return producedMessages
  191. }
  192. func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
  193. // Consume all produced messages with all client versions supported by the
  194. // cluster.
  195. consumerVersionLoop:
  196. for _, consVer := range clientVersions {
  197. t.Logf("*** Consuming with client version %s\n", consVer)
  198. // Create a partition consumer that should start from the first produced
  199. // message.
  200. consCfg := NewConfig()
  201. consCfg.Version = consVer
  202. c, err := NewConsumer(kafkaBrokers, consCfg)
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. defer safeClose(t, c)
  207. pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. defer safeClose(t, pc)
  212. // Consume as many messages as there have been produced and make sure that
  213. // order is preserved.
  214. for i, prodMsg := range producedMessages {
  215. select {
  216. case consMsg := <-pc.Messages():
  217. if consMsg.Offset != prodMsg.Offset {
  218. t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
  219. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  220. continue consumerVersionLoop
  221. }
  222. if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
  223. t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
  224. consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
  225. continue consumerVersionLoop
  226. }
  227. case <-time.After(3 * time.Second):
  228. t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
  229. }
  230. }
  231. }
  232. }