Explorar o código

Add support for newer OffsetCommit request/response

Mickael Maison %!s(int64=5) %!d(string=hai) anos
pai
achega
3f16fb0e7a

+ 7 - 1
offset_commit_request.go

@@ -52,12 +52,14 @@ type OffsetCommitRequest struct {
 	// - 0 (kafka 0.8.1 and later)
 	// - 1 (kafka 0.8.2 and later)
 	// - 2 (kafka 0.9.0 and later)
+	// - 3 (kafka 0.11.0 and later)
+	// - 4 (kafka 2.0.0 and later)
 	Version int16
 	blocks  map[string]map[int32]*offsetCommitRequestBlock
 }
 
 func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
-	if r.Version < 0 || r.Version > 2 {
+	if r.Version < 0 || r.Version > 4 {
 		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
 	}
 
@@ -174,6 +176,10 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
 		return V0_8_2_0
 	case 2:
 		return V0_9_0_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
 	default:
 		return MinVersion
 	}

+ 16 - 11
offset_commit_request_test.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 var (
 	offsetCommitRequestNoBlocksV0 = []byte{
@@ -76,15 +79,17 @@ func TestOffsetCommitRequestV1(t *testing.T) {
 	testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
 }
 
-func TestOffsetCommitRequestV2(t *testing.T) {
-	request := new(OffsetCommitRequest)
-	request.ConsumerGroup = "foobar"
-	request.ConsumerID = "cons"
-	request.ConsumerGroupGeneration = 0x1122
-	request.RetentionTime = 0x4433
-	request.Version = 2
-	testRequest(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)
+func TestOffsetCommitRequestV2ToV4(t *testing.T) {
+	for version := 2; version <= 4; version++ {
+		request := new(OffsetCommitRequest)
+		request.ConsumerGroup = "foobar"
+		request.ConsumerID = "cons"
+		request.ConsumerGroupGeneration = 0x1122
+		request.RetentionTime = 0x4433
+		request.Version = int16(version)
+		testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2)
 
-	request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
-	testRequest(t, "one block v2", request, offsetCommitRequestOneBlockV2)
+		request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
+		testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2)
+	}
 }

+ 28 - 3
offset_commit_response.go

@@ -1,7 +1,9 @@
 package sarama
 
 type OffsetCommitResponse struct {
-	Errors map[string]map[int32]KError
+	Version        int16
+	ThrottleTimeMs int32
+	Errors         map[string]map[int32]KError
 }
 
 func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
@@ -17,6 +19,9 @@ func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KE
 }
 
 func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
 	if err := pe.putArrayLength(len(r.Errors)); err != nil {
 		return err
 	}
@@ -36,6 +41,15 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
 }
 
 func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	numTopics, err := pd.getArrayLength()
 	if err != nil || numTopics == 0 {
 		return err
@@ -77,9 +91,20 @@ func (r *OffsetCommitResponse) key() int16 {
 }
 
 func (r *OffsetCommitResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_9_0_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	default:
+		return MinVersion
+	}
 }

+ 15 - 0
offset_commit_response_test.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"fmt"
 	"testing"
 )
 
@@ -22,3 +23,17 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 	// unpredictable map traversal order.
 	testResponse(t, "normal", &response, nil)
 }
+
+func TestOffsetCommitResponseWithThrottleTime(t *testing.T) {
+	for version := 3; version <= 4; version++ {
+		response := OffsetCommitResponse{
+			Version:        int16(version),
+			ThrottleTimeMs: 123,
+		}
+		response.AddError("t", 0, ErrNotLeaderForPartition)
+		response.Errors["m"] = make(map[int32]KError)
+		// The response encoded form cannot be checked for it varies due to
+		// unpredictable map traversal order.
+		testResponse(t, fmt.Sprintf("v%d with throttle time", version), &response, nil)
+	}
+}