Kaynağa Gözat

Merge pull request #1040 from mailgun/maxim/test

 Add full version matrix produce/consume test
Evan Huus 7 yıl önce
ebeveyn
işleme
44e7121d3b

+ 1 - 1
Makefile

@@ -4,7 +4,7 @@ default: fmt vet errcheck test
 test:
 	echo "" > coverage.txt
 	for d in `go list ./... | grep -v vendor`; do \
-		go test -v -timeout 60s -race -coverprofile=profile.out -covermode=atomic $$d; \
+		go test -p 1 -v -timeout 90s -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
 		if [ -f profile.out ]; then \
 			cat profile.out >> coverage.txt; \
 			rm profile.out; \

+ 1 - 6
alter_configs_request_test.go

@@ -31,11 +31,7 @@ var (
 		'1', '0', '0', '0',
 		2,                   // a topic
 		0, 3, 'b', 'a', 'r', // topic name: foo
-		0, 0, 0, 2, //2 config
-		0, 10, // 10 chars
-		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
-		0, 4,
-		'1', '0', '0', '0',
+		0, 0, 0, 1, //2 config
 		0, 12, // 12 chars
 		'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
 		0, 4,
@@ -80,7 +76,6 @@ func TestAlterConfigsRequest(t *testing.T) {
 				Type: TopicResource,
 				Name: "bar",
 				ConfigEntries: map[string]*string{
-					"segment.ms":   &configValue,
 					"retention.ms": &configValue,
 				},
 			},

+ 4 - 1
async_producer_test.go

@@ -131,9 +131,12 @@ func TestAsyncProducer(t *testing.T) {
 			if msg.Metadata.(int) != i {
 				t.Error("Message metadata did not match")
 			}
+		case <-time.After(time.Second):
+			t.Errorf("Timeout waiting for msg #%d", i)
+			goto done
 		}
 	}
-
+done:
 	closeProducer(t, producer)
 	leader.Close()
 	seedBroker.Close()

+ 1 - 1
config.go

@@ -310,7 +310,7 @@ func NewConfig() *Config {
 
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
-	c.Version = minVersion
+	c.Version = MinVersion
 	c.MetricRegistry = metrics.NewRegistry()
 
 	return c

+ 25 - 52
consumer.go

@@ -482,9 +482,6 @@ feederLoop:
 
 func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
-	var incomplete bool
-	prelude := true
-
 	for _, msgBlock := range msgSet.Messages {
 		for _, msg := range msgBlock.Messages() {
 			offset := msg.Offset
@@ -492,29 +489,22 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
 				offset += baseOffset
 			}
-			if prelude && offset < child.offset {
+			if offset < child.offset {
 				continue
 			}
-			prelude = false
-
-			if offset >= child.offset {
-				messages = append(messages, &ConsumerMessage{
-					Topic:          child.topic,
-					Partition:      child.partition,
-					Key:            msg.Msg.Key,
-					Value:          msg.Msg.Value,
-					Offset:         offset,
-					Timestamp:      msg.Msg.Timestamp,
-					BlockTimestamp: msgBlock.Msg.Timestamp,
-				})
-				child.offset = offset + 1
-			} else {
-				incomplete = true
-			}
+			messages = append(messages, &ConsumerMessage{
+				Topic:          child.topic,
+				Partition:      child.partition,
+				Key:            msg.Msg.Key,
+				Value:          msg.Msg.Value,
+				Offset:         offset,
+				Timestamp:      msg.Msg.Timestamp,
+				BlockTimestamp: msgBlock.Msg.Timestamp,
+			})
+			child.offset = offset + 1
 		}
 	}
-
-	if incomplete || len(messages) == 0 {
+	if len(messages) == 0 {
 		return nil, ErrIncompleteResponse
 	}
 	return messages, nil
@@ -522,42 +512,25 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 
 func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
-	var incomplete bool
-	prelude := true
-	originalOffset := child.offset
-
 	for _, rec := range batch.Records {
 		offset := batch.FirstOffset + rec.OffsetDelta
-		if prelude && offset < child.offset {
+		if offset < child.offset {
 			continue
 		}
-		prelude = false
-
-		if offset >= child.offset {
-			messages = append(messages, &ConsumerMessage{
-				Topic:     child.topic,
-				Partition: child.partition,
-				Key:       rec.Key,
-				Value:     rec.Value,
-				Offset:    offset,
-				Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
-				Headers:   rec.Headers,
-			})
-			child.offset = offset + 1
-		} else {
-			incomplete = true
-		}
-	}
-
-	if incomplete {
+		messages = append(messages, &ConsumerMessage{
+			Topic:     child.topic,
+			Partition: child.partition,
+			Key:       rec.Key,
+			Value:     rec.Value,
+			Offset:    offset,
+			Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
+			Headers:   rec.Headers,
+		})
+		child.offset = offset + 1
+	}
+	if len(messages) == 0 {
 		return nil, ErrIncompleteResponse
 	}
-
-	child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
-	if child.offset <= originalOffset {
-		return nil, ErrConsumerOffsetNotAdvanced
-	}
-
 	return messages, nil
 }
 

+ 1 - 1
fetch_request.go

@@ -149,7 +149,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 	case 4:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
fetch_response.go

@@ -280,7 +280,7 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
 	case 4:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 166 - 1
functional_consumer_test.go

@@ -1,8 +1,13 @@
 package sarama
 
 import (
+	"fmt"
 	"math"
+	"os"
+	"sort"
+	"sync"
 	"testing"
+	"time"
 )
 
 func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
@@ -46,7 +51,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 	}
 	defer safeClose(t, c)
 
-	pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
+	pc, err := c.ConsumePartition("test.1", 0, offset)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -59,3 +64,163 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 
 	safeClose(t, pc)
 }
+
+// Makes sure that messages produced by all supported client versions/
+// compression codecs (except LZ4) combinations can be consumed by all
+// supported consumer versions. It relies on the KAFKA_VERSION environment
+// variable to provide the version of the test Kafka cluster.
+//
+// Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
+// from this test case. It has a similar version matrix test case below that
+// only checks versions from v0.10.0.0 until KAFKA_VERSION.
+func TestVersionMatrix(t *testing.T) {
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	// Produce lot's of message with all possible combinations of supported
+	// protocol versions and compressions for the except of LZ4.
+	testVersions := versionRange(V0_8_2_0)
+	allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
+	producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100)
+
+	// When/Then
+	consumeMsgs(t, testVersions, producedMessages)
+}
+
+// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
+// test LZ4 should start with v0.10.0.0.
+func TestVersionMatrixLZ4(t *testing.T) {
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	// Produce lot's of message with all possible combinations of supported
+	// protocol versions starting with v0.10 (first where LZ4 was supported)
+	// and all possible compressions.
+	testVersions := versionRange(V0_10_0_0)
+	allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
+	producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100)
+
+	// When/Then
+	consumeMsgs(t, testVersions, producedMessages)
+}
+
+func prodMsg2Str(prodMsg *ProducerMessage) string {
+	return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
+}
+
+func consMsg2Str(consMsg *ConsumerMessage) string {
+	return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
+}
+
+func versionRange(lower KafkaVersion) []KafkaVersion {
+	// Get the test cluster version from the environment. If there is nothing
+	// there then assume the highest.
+	upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
+	if err != nil {
+		upper = MaxVersion
+	}
+
+	versions := make([]KafkaVersion, 0, len(SupportedVersions))
+	for _, v := range SupportedVersions {
+		if !v.IsAtLeast(lower) {
+			continue
+		}
+		if !upper.IsAtLeast(v) {
+			return versions
+		}
+		versions = append(versions, v)
+	}
+	return versions
+}
+
+func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage {
+	var wg sync.WaitGroup
+	var producedMessagesMu sync.Mutex
+	var producedMessages []*ProducerMessage
+	for _, prodVer := range clientVersions {
+		for _, codec := range codecs {
+			prodCfg := NewConfig()
+			prodCfg.Version = prodVer
+			prodCfg.Producer.Return.Successes = true
+			prodCfg.Producer.Return.Errors = true
+			prodCfg.Producer.Flush.MaxMessages = flush
+			prodCfg.Producer.Compression = codec
+
+			p, err := NewSyncProducer(kafkaBrokers, prodCfg)
+			if err != nil {
+				t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
+				continue
+			}
+			defer safeClose(t, p)
+			for i := 0; i < countPerVerCodec; i++ {
+				msg := &ProducerMessage{
+					Topic: "test.1",
+					Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
+				}
+				wg.Add(1)
+				go func() {
+					defer wg.Done()
+					_, _, err := p.SendMessage(msg)
+					if err != nil {
+						t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
+					}
+					producedMessagesMu.Lock()
+					producedMessages = append(producedMessages, msg)
+					producedMessagesMu.Unlock()
+				}()
+			}
+		}
+	}
+	wg.Wait()
+
+	// Sort produced message in ascending offset order.
+	sort.Slice(producedMessages, func(i, j int) bool {
+		return producedMessages[i].Offset < producedMessages[j].Offset
+	})
+	t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
+		len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
+	return producedMessages
+}
+
+func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
+	// Consume all produced messages with all client versions supported by the
+	// cluster.
+consumerVersionLoop:
+	for _, consVer := range clientVersions {
+		t.Logf("*** Consuming with client version %s\n", consVer)
+		// Create a partition consumer that should start from the first produced
+		// message.
+		consCfg := NewConfig()
+		consCfg.Version = consVer
+		c, err := NewConsumer(kafkaBrokers, consCfg)
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer safeClose(t, c)
+		pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer safeClose(t, pc)
+
+		// Consume as many messages as there have been produced and make sure that
+		// order is preserved.
+		for i, prodMsg := range producedMessages {
+			select {
+			case consMsg := <-pc.Messages():
+				if consMsg.Offset != prodMsg.Offset {
+					t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
+						consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
+					continue consumerVersionLoop
+				}
+				if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
+					t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
+						consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
+					continue consumerVersionLoop
+				}
+			case <-time.After(3 * time.Second):
+				t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
+			}
+		}
+	}
+}

+ 9 - 0
message.go

@@ -24,6 +24,15 @@ const (
 	CompressionLZ4    CompressionCodec = 3
 )
 
+func (cc CompressionCodec) String() string {
+	return []string{
+		"none",
+		"gzip",
+		"snappy",
+		"lz4",
+	}[int(cc)]
+}
+
 // CompressionLevelDefault is the constant to use in CompressionLevel
 // to have the default compression level for any codec. The value is picked
 // that we don't use any existing compression levels.

+ 1 - 1
metadata_request.go

@@ -48,5 +48,5 @@ func (r *MetadataRequest) version() int16 {
 }
 
 func (r *MetadataRequest) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }

+ 1 - 1
metadata_response.go

@@ -185,7 +185,7 @@ func (r *MetadataResponse) version() int16 {
 }
 
 func (r *MetadataResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }
 
 // testing API

+ 1 - 1
offset_commit_request.go

@@ -173,7 +173,7 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
 	case 2:
 		return V0_9_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_commit_response.go

@@ -81,5 +81,5 @@ func (r *OffsetCommitResponse) version() int16 {
 }
 
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }

+ 1 - 1
offset_fetch_request.go

@@ -68,7 +68,7 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_8_2_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_fetch_response.go

@@ -115,7 +115,7 @@ func (r *OffsetFetchResponse) version() int16 {
 }
 
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }
 
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {

+ 1 - 1
offset_request.go

@@ -109,7 +109,7 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_10_1_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_response.go

@@ -155,7 +155,7 @@ func (r *OffsetResponse) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_10_1_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
produce_request.go

@@ -215,7 +215,7 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
 	case 3:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 3 - 1
produce_request_test.go

@@ -99,6 +99,8 @@ func TestProduceRequest(t *testing.T) {
 	}
 	request.AddBatch("topic", 0xAD, batch)
 	packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
-	batch.Records[0].length.startOffset = 0
+	// compressRecords field is not populated on decoding because consumers
+	// are only interested in decoded records.
+	batch.compressedRecords = nil
 	testRequestDecode(t, "one record", request, packet)
 }

+ 1 - 1
produce_response.go

@@ -152,7 +152,7 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
 	case 3:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 9 - 5
record_test.go

@@ -116,11 +116,12 @@ var recordBatchTestCases = []struct {
 	{
 		name: "gzipped record",
 		batch: RecordBatch{
-			Version:         2,
-			Codec:           CompressionGZIP,
-			FirstTimestamp:  time.Unix(1479847795, 0),
-			MaxTimestamp:    time.Unix(0, 0),
-			LastOffsetDelta: 0,
+			Version:          2,
+			Codec:            CompressionGZIP,
+			CompressionLevel: CompressionLevelDefault,
+			FirstTimestamp:   time.Unix(1479847795, 0),
+			MaxTimestamp:     time.Unix(0, 0),
+			LastOffsetDelta:  0,
 			Records: []*Record{{
 				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
@@ -281,6 +282,9 @@ func TestRecordBatchDecoding(t *testing.T) {
 		for _, r := range tc.batch.Records {
 			r.length = varintLengthField{}
 		}
+		// The compression level is not restored on decoding. It is not needed
+		// anyway. We only set it here to ensure that comparision succeeds.
+		batch.CompressionLevel = tc.batch.CompressionLevel
 		if !reflect.DeepEqual(batch, tc.batch) {
 			t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
 		}

+ 39 - 13
utils.go

@@ -139,21 +139,47 @@ func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
 
 // Effective constants defining the supported kafka versions.
 var (
-	V0_8_2_0   = newKafkaVersion(0, 8, 2, 0)
-	V0_8_2_1   = newKafkaVersion(0, 8, 2, 1)
-	V0_8_2_2   = newKafkaVersion(0, 8, 2, 2)
-	V0_9_0_0   = newKafkaVersion(0, 9, 0, 0)
-	V0_9_0_1   = newKafkaVersion(0, 9, 0, 1)
-	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
-	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
-	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
-	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
-	V0_11_0_0  = newKafkaVersion(0, 11, 0, 0)
-	V1_0_0_0   = newKafkaVersion(1, 0, 0, 0)
-	minVersion = V0_8_2_0
+	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
+	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
+	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
+	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
+	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
+	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
+	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
+	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
+	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
+	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
+	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
+	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
+	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
+	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
+	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
+
+	SupportedVersions = []KafkaVersion{
+		V0_8_2_0,
+		V0_8_2_1,
+		V0_8_2_2,
+		V0_9_0_0,
+		V0_9_0_1,
+		V0_10_0_0,
+		V0_10_0_1,
+		V0_10_1_0,
+		V0_10_1_1,
+		V0_10_2_0,
+		V0_10_2_1,
+		V0_11_0_0,
+		V0_11_0_1,
+		V0_11_0_2,
+		V1_0_0_0,
+	}
+	MinVersion = V0_8_2_0
+	MaxVersion = V1_0_0_0
 )
 
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
+	if len(s) < 5 {
+		return MinVersion, fmt.Errorf("invalid version `%s`", s)
+	}
 	var major, minor, veryMinor, patch uint
 	var err error
 	if s[0] == '0' {
@@ -162,7 +188,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) {
 		err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
 	}
 	if err != nil {
-		return minVersion, err
+		return MinVersion, err
 	}
 	return newKafkaVersion(major, minor, veryMinor, patch), nil
 }