123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- //+build functional
- package sarama
- import (
- "fmt"
- "math"
- "os"
- "sort"
- "sync"
- "testing"
- "time"
- "github.com/stretchr/testify/require"
- )
- func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
- if err != nil {
- t.Fatal(err)
- }
- if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
- t.Error("Expected ErrOffsetOutOfRange, got:", err)
- }
- if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
- t.Error("Expected ErrOffsetOutOfRange, got:", err)
- }
- safeClose(t, consumer)
- }
- func TestConsumerHighWaterMarkOffset(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, p)
- _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
- if err != nil {
- t.Fatal(err)
- }
- c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, c)
- pc, err := c.ConsumePartition("test.1", 0, offset)
- if err != nil {
- t.Fatal(err)
- }
- <-pc.Messages()
- if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
- t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
- }
- safeClose(t, pc)
- }
- // Makes sure that messages produced by all supported client versions/
- // compression codecs (except LZ4) combinations can be consumed by all
- // supported consumer versions. It relies on the KAFKA_VERSION environment
- // variable to provide the version of the test Kafka cluster.
- //
- // Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
- // from this test case. It has a similar version matrix test case below that
- // only checks versions from v0.10.0.0 until KAFKA_VERSION.
- func TestVersionMatrix(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- // Produce lot's of message with all possible combinations of supported
- // protocol versions and compressions for the except of LZ4.
- testVersions := versionRange(V0_8_2_0)
- allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
- producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
- // When/Then
- consumeMsgs(t, testVersions, producedMessages)
- }
- // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
- // test LZ4 should start with v0.10.0.0.
- func TestVersionMatrixLZ4(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- // Produce lot's of message with all possible combinations of supported
- // protocol versions starting with v0.10 (first where LZ4 was supported)
- // and all possible compressions.
- testVersions := versionRange(V0_10_0_0)
- allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
- producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
- // When/Then
- consumeMsgs(t, testVersions, producedMessages)
- }
- // Support for zstd codec was introduced in v2.1.0.0
- func TestVersionMatrixZstd(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- // Produce lot's of message with all possible combinations of supported
- // protocol versions starting with v2.1.0.0 (first where zstd was supported)
- testVersions := versionRange(V2_1_0_0)
- allCodecs := []CompressionCodec{CompressionZSTD}
- producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
- // When/Then
- consumeMsgs(t, testVersions, producedMessages)
- }
- func TestVersionMatrixIdempotent(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- // Produce lot's of message with all possible combinations of supported
- // protocol versions starting with v0.11 (first where idempotent was supported)
- testVersions := versionRange(V0_11_0_0)
- producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
- // When/Then
- consumeMsgs(t, testVersions, producedMessages)
- }
- func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
- checkKafkaVersion(t, "0.11.0")
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- config := NewConfig()
- config.Consumer.IsolationLevel = ReadCommitted
- config.Version = V0_11_0_0
- consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
- require.NoError(t, err)
- msgChannel := pc.Messages()
- for i := 1; i <= 6; i++ {
- msg := <-msgChannel
- require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
- }
- }
- func prodMsg2Str(prodMsg *ProducerMessage) string {
- return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
- }
- func consMsg2Str(consMsg *ConsumerMessage) string {
- return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
- }
- func versionRange(lower KafkaVersion) []KafkaVersion {
- // Get the test cluster version from the environment. If there is nothing
- // there then assume the highest.
- upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
- if err != nil {
- upper = MaxVersion
- }
- versions := make([]KafkaVersion, 0, len(SupportedVersions))
- for _, v := range SupportedVersions {
- if !v.IsAtLeast(lower) {
- continue
- }
- if !upper.IsAtLeast(v) {
- return versions
- }
- versions = append(versions, v)
- }
- return versions
- }
- func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
- var wg sync.WaitGroup
- var producedMessagesMu sync.Mutex
- var producedMessages []*ProducerMessage
- for _, prodVer := range clientVersions {
- for _, codec := range codecs {
- prodCfg := NewConfig()
- prodCfg.Version = prodVer
- prodCfg.Producer.Return.Successes = true
- prodCfg.Producer.Return.Errors = true
- prodCfg.Producer.Flush.MaxMessages = flush
- prodCfg.Producer.Compression = codec
- prodCfg.Producer.Idempotent = idempotent
- if idempotent {
- prodCfg.Producer.RequiredAcks = WaitForAll
- prodCfg.Net.MaxOpenRequests = 1
- }
- p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
- if err != nil {
- t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
- continue
- }
- defer safeClose(t, p)
- for i := 0; i < countPerVerCodec; i++ {
- msg := &ProducerMessage{
- Topic: "test.1",
- Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- _, _, err := p.SendMessage(msg)
- if err != nil {
- t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
- }
- producedMessagesMu.Lock()
- producedMessages = append(producedMessages, msg)
- producedMessagesMu.Unlock()
- }()
- }
- }
- }
- wg.Wait()
- // Sort produced message in ascending offset order.
- sort.Slice(producedMessages, func(i, j int) bool {
- return producedMessages[i].Offset < producedMessages[j].Offset
- })
- t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
- len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
- return producedMessages
- }
- func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
- // Consume all produced messages with all client versions supported by the
- // cluster.
- consumerVersionLoop:
- for _, consVer := range clientVersions {
- t.Logf("*** Consuming with client version %s\n", consVer)
- // Create a partition consumer that should start from the first produced
- // message.
- consCfg := NewConfig()
- consCfg.Version = consVer
- c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, c)
- pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, pc)
- // Consume as many messages as there have been produced and make sure that
- // order is preserved.
- for i, prodMsg := range producedMessages {
- select {
- case consMsg := <-pc.Messages():
- if consMsg.Offset != prodMsg.Offset {
- t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
- consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
- continue consumerVersionLoop
- }
- if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
- t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
- consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
- continue consumerVersionLoop
- }
- case <-time.After(3 * time.Second):
- t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
- }
- }
- }
- }
|