ソースを参照

Added OffsetFetch support up to v5

Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Mickael Maison 5 年 前
コミット
6c7918ef5a
5 ファイル変更120 行追加35 行削除
  1. 1 1
      mockresponses.go
  2. 14 2
      offset_fetch_request.go
  3. 23 10
      offset_fetch_request_test.go
  4. 40 10
      offset_fetch_response.go
  5. 42 12
      offset_fetch_response_test.go

+ 1 - 1
mockresponses.go

@@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 		partitions = make(map[int32]*OffsetFetchResponseBlock)
 		topics[topic] = partitions
 	}
-	partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
+	partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
 	return mr
 }
 

+ 14 - 2
offset_fetch_request.go

@@ -7,7 +7,7 @@ type OffsetFetchRequest struct {
 }
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
-	if r.Version < 0 || r.Version > 2 {
+	if r.Version < 0 || r.Version > 5 {
 		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
 	}
 
@@ -42,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
 	if err != nil {
 		return err
 	}
-	if partitionCount <= 0 {
+	if (partitionCount == 0 && version < 2) || partitionCount < 0 {
 		return nil
 	}
 	r.partitions = make(map[string][]int32)
@@ -74,11 +74,23 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 		return V0_8_2_0
 	case 2:
 		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
 	default:
 		return MinVersion
 	}
 }
 
+func (r *OffsetFetchRequest) ZeroPartitions() {
+	if r.partitions == nil && r.Version >= 2 {
+		r.partitions = make(map[string][]int32)
+	}
+}
+
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
 	if r.partitions == nil {
 		r.partitions = make(map[string][]int32)

+ 23 - 10
offset_fetch_request_test.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"fmt"
 	"testing"
 )
 
@@ -25,18 +26,30 @@ var (
 		0xff, 0xff, 0xff, 0xff}
 )
 
-func TestOffsetFetchRequest(t *testing.T) {
-	request := new(OffsetFetchRequest)
-	testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)
-
-	request.ConsumerGroup = "blah"
-	testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)
+func TestOffsetFetchRequestNoPartitions(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ZeroPartitions()
+		testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions)
 
-	request.AddPartition("topicTheFirst", 0x4F4F4F4F)
-	testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
+		request.ConsumerGroup = "blah"
+		testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
+	}
+}
+func TestOffsetFetchRequest(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ConsumerGroup = "blah"
+		request.AddPartition("topicTheFirst", 0x4F4F4F4F)
+		testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
+	}
 }
 
 func TestOffsetFetchRequestAllPartitions(t *testing.T) {
-	requestV2 := &OffsetFetchRequest{Version: 2, ConsumerGroup: "blah"}
-	testRequest(t, "all partitions", requestV2, offsetFetchRequestAllPartitions)
+	for version := 2; version <= 5; version++ {
+		request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"}
+		testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions)
+	}
 }

+ 40 - 10
offset_fetch_response.go

@@ -1,17 +1,25 @@
 package sarama
 
 type OffsetFetchResponseBlock struct {
-	Offset   int64
-	Metadata string
-	Err      KError
+	Offset      int64
+	LeaderEpoch int32
+	Metadata    string
+	Err         KError
 }
 
-func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	b.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
+	if version >= 5 {
+		b.LeaderEpoch, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	b.Metadata, err = pd.getString()
 	if err != nil {
 		return err
@@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt64(b.Offset)
 
+	if version >= 5 {
+		pe.putInt32(b.LeaderEpoch)
+	}
+
 	err = pe.putString(b.Metadata)
 	if err != nil {
 		return err
@@ -40,12 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 
 type OffsetFetchResponse struct {
-	Version int16
-	Blocks  map[string]map[int32]*OffsetFetchResponseBlock
-	Err     KError
+	Version        int16
+	ThrottleTimeMs int32
+	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
+	Err            KError
 }
 
 func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
+
 	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
 		return err
 	}
@@ -58,7 +75,7 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			if err := block.encode(pe); err != nil {
+			if err := block.encode(pe, r.Version); err != nil {
 				return err
 			}
 		}
@@ -72,6 +89,13 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
 	r.Version = version
 
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -103,7 +127,7 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error
 				}
 
 				block := new(OffsetFetchResponseBlock)
-				err = block.decode(pd)
+				err = block.decode(pd, version)
 				if err != nil {
 					return err
 				}
@@ -137,6 +161,12 @@ func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
 		return V0_8_2_0
 	case 2:
 		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
 	default:
 		return MinVersion
 	}

+ 42 - 12
offset_fetch_response_test.go

@@ -1,35 +1,65 @@
 package sarama
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 var (
 	emptyOffsetFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
+
 	emptyOffsetFetchResponseV2 = []byte{
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x2A}
+
+	emptyOffsetFetchResponseV3 = []byte{
+		0x00, 0x00, 0x00, 0x09,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
 )
 
 func TestEmptyOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	testResponse(t, "empty", &response, emptyOffsetFetchResponse)
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
+	}
 
 	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
-	testResponse(t, "emptyV2", &responseV2, emptyOffsetFetchResponseV2)
+	testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)
+
+	for version := 3; version <= 5; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
+	}
 }
 
 func TestNormalOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
-	response.Blocks["m"] = nil
 	// The response encoded form cannot be checked for it varies due to
 	// unpredictable map traversal order.
-	testResponse(t, "normal", &response, nil)
+	// Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls
+
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		response.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
+	}
 
 	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
-	responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
+	responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
 	responseV2.Blocks["m"] = nil
-	// The response encoded form cannot be checked for it varies due to
-	// unpredictable map traversal order.
-	testResponse(t, "normalV2", &responseV2, nil)
+	testResponse(t, "normal V2", &responseV2, nil)
+
+	for version := 3; version <= 4; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		responseV3.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
+	}
+
+	responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+	responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
+	responseV5.Blocks["m"] = nil
+	testResponse(t, "normal V5", &responseV5, nil)
 }