Browse Source

Merge pull request #1198 from mimaison/offset-fetch-new-vers

Support new OffsetFetch request/response
Vlad Gorodetsky 7 years ago
parent
commit
fc8380c11f
5 changed files with 197 additions and 57 deletions
  1. 1 1
      mockresponses.go
  2. 29 10
      offset_fetch_request.go
  3. 32 8
      offset_fetch_request_test.go
  4. 85 31
      offset_fetch_response.go
  5. 50 7
      offset_fetch_response_test.go

+ 1 - 1
mockresponses.go

@@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 		partitions = make(map[int32]*OffsetFetchResponseBlock)
 		topics[topic] = partitions
 	}
-	partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
+	partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
 	return mr
 }
 

+ 29 - 10
offset_fetch_request.go

@@ -1,28 +1,33 @@
 package sarama
 
 type OffsetFetchRequest struct {
-	ConsumerGroup string
 	Version       int16
+	ConsumerGroup string
 	partitions    map[string][]int32
 }
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
-	if r.Version < 0 || r.Version > 1 {
+	if r.Version < 0 || r.Version > 5 {
 		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
 	}
 
 	if err = pe.putString(r.ConsumerGroup); err != nil {
 		return err
 	}
-	if err = pe.putArrayLength(len(r.partitions)); err != nil {
-		return err
-	}
-	for topic, partitions := range r.partitions {
-		if err = pe.putString(topic); err != nil {
+
+	if r.Version >= 2 && r.partitions == nil {
+		pe.putInt32(-1)
+	} else {
+		if err = pe.putArrayLength(len(r.partitions)); err != nil {
 			return err
 		}
-		if err = pe.putInt32Array(partitions); err != nil {
-			return err
+		for topic, partitions := range r.partitions {
+			if err = pe.putString(topic); err != nil {
+				return err
+			}
+			if err = pe.putInt32Array(partitions); err != nil {
+				return err
+			}
 		}
 	}
 	return nil
@@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
 	if err != nil {
 		return err
 	}
-	if partitionCount == 0 {
+	if (partitionCount == 0 && version < 2) || partitionCount < 0 {
 		return nil
 	}
 	r.partitions = make(map[string][]int32)
@@ -67,11 +72,25 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:
 		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
 	default:
 		return MinVersion
 	}
 }
 
+func (r *OffsetFetchRequest) ZeroPartitions() {
+	if r.partitions == nil && r.Version >= 2 {
+		r.partitions = make(map[string][]int32)
+	}
+}
+
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
 	if r.partitions == nil {
 		r.partitions = make(map[string][]int32)

+ 32 - 8
offset_fetch_request_test.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 var (
 	offsetFetchRequestNoGroupNoPartitions = []byte{
@@ -17,15 +20,36 @@ var (
 		0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
 		0x00, 0x00, 0x00, 0x01,
 		0x4F, 0x4F, 0x4F, 0x4F}
+
+	offsetFetchRequestAllPartitions = []byte{
+		0x00, 0x04, 'b', 'l', 'a', 'h',
+		0xff, 0xff, 0xff, 0xff}
 )
 
-func TestOffsetFetchRequest(t *testing.T) {
-	request := new(OffsetFetchRequest)
-	testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)
+func TestOffsetFetchRequestNoPartitions(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ZeroPartitions()
+		testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions)
 
-	request.ConsumerGroup = "blah"
-	testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)
+		request.ConsumerGroup = "blah"
+		testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
+	}
+}
+func TestOffsetFetchRequest(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ConsumerGroup = "blah"
+		request.AddPartition("topicTheFirst", 0x4F4F4F4F)
+		testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
+	}
+}
 
-	request.AddPartition("topicTheFirst", 0x4F4F4F4F)
-	testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
+func TestOffsetFetchRequestAllPartitions(t *testing.T) {
+	for version := 2; version <= 5; version++ {
+		request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"}
+		testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions)
+	}
 }

+ 85 - 31
offset_fetch_response.go

@@ -1,17 +1,25 @@
 package sarama
 
 type OffsetFetchResponseBlock struct {
-	Offset   int64
-	Metadata string
-	Err      KError
+	Offset      int64
+	LeaderEpoch int32
+	Metadata    string
+	Err         KError
 }
 
-func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	b.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
+	if version >= 5 {
+		b.LeaderEpoch, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	b.Metadata, err = pd.getString()
 	if err != nil {
 		return err
@@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt64(b.Offset)
 
+	if version >= 5 {
+		pe.putInt32(b.LeaderEpoch)
+	}
+
 	err = pe.putString(b.Metadata)
 	if err != nil {
 		return err
@@ -40,10 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 
 type OffsetFetchResponse struct {
-	Blocks map[string]map[int32]*OffsetFetchResponseBlock
+	Version        int16
+	ThrottleTimeMs int32
+	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
+	Err            KError
 }
 
 func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
+
 	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
 		return err
 	}
@@ -56,51 +75,73 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			if err := block.encode(pe); err != nil {
+			if err := block.encode(pe, r.Version); err != nil {
 				return err
 			}
 		}
 	}
+	if r.Version >= 2 {
+		pe.putInt16(int16(r.Err))
+	}
 	return nil
 }
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
-	numTopics, err := pd.getArrayLength()
-	if err != nil || numTopics == 0 {
-		return err
-	}
-
-	r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
-	for i := 0; i < numTopics; i++ {
-		name, err := pd.getString()
-		if err != nil {
-			return err
-		}
+	r.Version = version
 
-		numBlocks, err := pd.getArrayLength()
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
 		if err != nil {
 			return err
 		}
+	}
 
-		if numBlocks == 0 {
-			r.Blocks[name] = nil
-			continue
-		}
-		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+	numTopics, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
 
-		for j := 0; j < numBlocks; j++ {
-			id, err := pd.getInt32()
+	if numTopics > 0 {
+		r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
+		for i := 0; i < numTopics; i++ {
+			name, err := pd.getString()
 			if err != nil {
 				return err
 			}
 
-			block := new(OffsetFetchResponseBlock)
-			err = block.decode(pd)
+			numBlocks, err := pd.getArrayLength()
 			if err != nil {
 				return err
 			}
-			r.Blocks[name][id] = block
+
+			if numBlocks == 0 {
+				r.Blocks[name] = nil
+				continue
+			}
+			r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+
+			for j := 0; j < numBlocks; j++ {
+				id, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+
+				block := new(OffsetFetchResponseBlock)
+				err = block.decode(pd, version)
+				if err != nil {
+					return err
+				}
+				r.Blocks[name][id] = block
+			}
+		}
+	}
+
+	if version >= 2 {
+		kerr, err := pd.getInt16()
+		if err != nil {
+			return err
 		}
+		r.Err = KError(kerr)
 	}
 
 	return nil
@@ -111,11 +152,24 @@ func (r *OffsetFetchResponse) key() int16 {
 }
 
 func (r *OffsetFetchResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
+	default:
+		return MinVersion
+	}
 }
 
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {

+ 50 - 7
offset_fetch_response_test.go

@@ -1,22 +1,65 @@
 package sarama
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 var (
 	emptyOffsetFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
+
+	emptyOffsetFetchResponseV2 = []byte{
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
+
+	emptyOffsetFetchResponseV3 = []byte{
+		0x00, 0x00, 0x00, 0x09,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
 )
 
 func TestEmptyOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	testResponse(t, "empty", &response, emptyOffsetFetchResponse)
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
+	}
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)
+
+	for version := 3; version <= 5; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
+	}
 }
 
 func TestNormalOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
-	response.Blocks["m"] = nil
 	// The response encoded form cannot be checked for it varies due to
 	// unpredictable map traversal order.
-	testResponse(t, "normal", &response, nil)
+	// Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls
+
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		response.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
+	}
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+	responseV2.Blocks["m"] = nil
+	testResponse(t, "normal V2", &responseV2, nil)
+
+	for version := 3; version <= 4; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		responseV3.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
+	}
+
+	responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+	responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
+	responseV5.Blocks["m"] = nil
+	testResponse(t, "normal V5", &responseV5, nil)
 }