Browse Source

Added OffsetFetch V2 support

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Edoardo Comar 6 years ago
parent
commit
a4ef905836
4 changed files with 93 additions and 38 deletions
  1. 17 10
      offset_fetch_request.go
  2. 12 1
      offset_fetch_request_test.go
  3. 51 27
      offset_fetch_response.go
  4. 13 0
      offset_fetch_response_test.go

+ 17 - 10
offset_fetch_request.go

@@ -1,28 +1,33 @@
 package sarama
 
 type OffsetFetchRequest struct {
-	ConsumerGroup string
 	Version       int16
+	ConsumerGroup string
 	partitions    map[string][]int32
 }
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
-	if r.Version < 0 || r.Version > 1 {
+	if r.Version < 0 || r.Version > 2 {
 		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
 	}
 
 	if err = pe.putString(r.ConsumerGroup); err != nil {
 		return err
 	}
-	if err = pe.putArrayLength(len(r.partitions)); err != nil {
-		return err
-	}
-	for topic, partitions := range r.partitions {
-		if err = pe.putString(topic); err != nil {
+
+	if r.Version >= 2 && r.partitions == nil {
+		pe.putInt32(-1)
+	} else {
+		if err = pe.putArrayLength(len(r.partitions)); err != nil {
 			return err
 		}
-		if err = pe.putInt32Array(partitions); err != nil {
-			return err
+		for topic, partitions := range r.partitions {
+			if err = pe.putString(topic); err != nil {
+				return err
+			}
+			if err = pe.putInt32Array(partitions); err != nil {
+				return err
+			}
 		}
 	}
 	return nil
@@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
 	if err != nil {
 		return err
 	}
-	if partitionCount == 0 {
+	if partitionCount <= 0 {
 		return nil
 	}
 	r.partitions = make(map[string][]int32)
@@ -67,6 +72,8 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:
 		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
 	default:
 		return MinVersion
 	}

+ 12 - 1
offset_fetch_request_test.go

@@ -1,6 +1,8 @@
 package sarama
 
-import "testing"
+import (
+	"testing"
+)
 
 var (
 	offsetFetchRequestNoGroupNoPartitions = []byte{
@@ -17,6 +19,10 @@ var (
 		0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
 		0x00, 0x00, 0x00, 0x01,
 		0x4F, 0x4F, 0x4F, 0x4F}
+
+	offsetFetchRequestAllPartitions = []byte{
+		0x00, 0x04, 'b', 'l', 'a', 'h',
+		0xff, 0xff, 0xff, 0xff}
 )
 
 func TestOffsetFetchRequest(t *testing.T) {
@@ -29,3 +35,8 @@ func TestOffsetFetchRequest(t *testing.T) {
 	request.AddPartition("topicTheFirst", 0x4F4F4F4F)
 	testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
 }
+
+func TestOffsetFetchRequestAllPartitions(t *testing.T) {
+	requestV2 := &OffsetFetchRequest{Version: 2, ConsumerGroup: "blah"}
+	testRequest(t, "all partitions", requestV2, offsetFetchRequestAllPartitions)
+}

+ 51 - 27
offset_fetch_response.go

@@ -40,7 +40,9 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 
 type OffsetFetchResponse struct {
-	Blocks map[string]map[int32]*OffsetFetchResponseBlock
+	Version int16
+	Blocks  map[string]map[int32]*OffsetFetchResponseBlock
+	Err     KError
 }
 
 func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
@@ -61,48 +63,63 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 			}
 		}
 	}
+	if r.Version >= 2 {
+		pe.putInt16(int16(r.Err))
+	}
 	return nil
 }
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
 	numTopics, err := pd.getArrayLength()
-	if err != nil || numTopics == 0 {
+	if err != nil {
 		return err
 	}
 
-	r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
-	for i := 0; i < numTopics; i++ {
-		name, err := pd.getString()
-		if err != nil {
-			return err
-		}
-
-		numBlocks, err := pd.getArrayLength()
-		if err != nil {
-			return err
-		}
-
-		if numBlocks == 0 {
-			r.Blocks[name] = nil
-			continue
-		}
-		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
-
-		for j := 0; j < numBlocks; j++ {
-			id, err := pd.getInt32()
+	if numTopics > 0 {
+		r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
+		for i := 0; i < numTopics; i++ {
+			name, err := pd.getString()
 			if err != nil {
 				return err
 			}
 
-			block := new(OffsetFetchResponseBlock)
-			err = block.decode(pd)
+			numBlocks, err := pd.getArrayLength()
 			if err != nil {
 				return err
 			}
-			r.Blocks[name][id] = block
+
+			if numBlocks == 0 {
+				r.Blocks[name] = nil
+				continue
+			}
+			r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+
+			for j := 0; j < numBlocks; j++ {
+				id, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+
+				block := new(OffsetFetchResponseBlock)
+				err = block.decode(pd)
+				if err != nil {
+					return err
+				}
+				r.Blocks[name][id] = block
+			}
 		}
 	}
 
+	if version >= 2 {
+		kerr, err := pd.getInt16()
+		if err != nil {
+			return err
+		}
+		r.Err = KError(kerr)
+	}
+
 	return nil
 }
 
@@ -111,11 +128,18 @@ func (r *OffsetFetchResponse) key() int16 {
 }
 
 func (r *OffsetFetchResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
+	default:
+		return MinVersion
+	}
 }
 
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {

+ 13 - 0
offset_fetch_response_test.go

@@ -5,11 +5,17 @@ import "testing"
 var (
 	emptyOffsetFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
+	emptyOffsetFetchResponseV2 = []byte{
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
 )
 
 func TestEmptyOffsetFetchResponse(t *testing.T) {
 	response := OffsetFetchResponse{}
 	testResponse(t, "empty", &response, emptyOffsetFetchResponse)
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	testResponse(t, "emptyV2", &responseV2, emptyOffsetFetchResponseV2)
 }
 
 func TestNormalOffsetFetchResponse(t *testing.T) {
@@ -19,4 +25,11 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 	// The response encoded form cannot be checked for it varies due to
 	// unpredictable map traversal order.
 	testResponse(t, "normal", &response, nil)
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
+	responseV2.Blocks["m"] = nil
+	// The response encoded form cannot be checked for it varies due to
+	// unpredictable map traversal order.
+	testResponse(t, "normalV2", &responseV2, nil)
 }