Browse Source

Added support for Metadata Request/Response up to v5

Mickael Maison 7 years ago
parent
commit
bbdeda9fd2
4 changed files with 186 additions and 24 deletions
  1. 21 5
      metadata_request.go
  2. 32 0
      metadata_request_test.go
  3. 59 15
      metadata_response.go
  4. 74 4
      metadata_response_test.go

+ 21 - 5
metadata_request.go

@@ -1,12 +1,13 @@
 package sarama
 
 type MetadataRequest struct {
-	Version int16
-	Topics  []string
+	Version                int16
+	Topics                 []string
+	AllowAutoTopicCreation bool
 }
 
 func (r *MetadataRequest) encode(pe packetEncoder) error {
-	if r.Version < 0 || r.Version > 1 {
+	if r.Version < 0 || r.Version > 5 {
 		return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
 	}
 	if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
@@ -24,6 +25,9 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
 	} else {
 		pe.putInt32(-1)
 	}
+	if r.Version > 3 {
+		pe.putBool(r.AllowAutoTopicCreation)
+	}
 	return nil
 }
 
@@ -49,9 +53,15 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
 			}
 			r.Topics[i] = topic
 		}
-		return nil
 	}
-
+	if r.Version > 3 {
+		autoCreation, err := pd.getBool()
+		if err != nil {
+			return err
+		}
+		r.AllowAutoTopicCreation = autoCreation
+	}
+	return nil
 }
 
 func (r *MetadataRequest) key() int16 {
@@ -66,6 +76,12 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:
 		return V0_10_0_0
+	case 2:
+		return V0_10_1_0
+	case 3, 4:
+		return V0_11_0_0
+	case 5:
+		return V1_0_0_0
 	default:
 		return MinVersion
 	}

+ 32 - 0
metadata_request_test.go

@@ -18,6 +18,9 @@ var (
 
 	metadataRequestNoTopicsV1 = []byte{
 		0xff, 0xff, 0xff, 0xff}
+
+	metadataRequestAutoCreateV4   = append(metadataRequestOneTopicV0, byte(1))
+	metadataRequestNoAutoCreateV4 = append(metadataRequestOneTopicV0, byte(0))
 )
 
 func TestMetadataRequestV0(t *testing.T) {
@@ -42,3 +45,32 @@ func TestMetadataRequestV1(t *testing.T) {
 	request.Topics = []string{"foo", "bar", "baz"}
 	testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
 }
+
+func TestMetadataRequestV2(t *testing.T) {
+	request := new(MetadataRequest)
+	request.Version = 2
+	testRequest(t, "no topics", request, metadataRequestNoTopicsV1)
+
+	request.Topics = []string{"topic1"}
+	testRequest(t, "one topic", request, metadataRequestOneTopicV0)
+}
+
+func TestMetadataRequestV3(t *testing.T) {
+	request := new(MetadataRequest)
+	request.Version = 3
+	testRequest(t, "no topics", request, metadataRequestNoTopicsV1)
+
+	request.Topics = []string{"topic1"}
+	testRequest(t, "one topic", request, metadataRequestOneTopicV0)
+}
+
+func TestMetadataRequestV4(t *testing.T) {
+	request := new(MetadataRequest)
+	request.Version = 4
+	request.Topics = []string{"topic1"}
+	request.AllowAutoTopicCreation = true
+	testRequest(t, "one topic", request, metadataRequestAutoCreateV4)
+
+	request.AllowAutoTopicCreation = false
+	testRequest(t, "one topic", request, metadataRequestNoAutoCreateV4)
+}

+ 59 - 15
metadata_response.go

@@ -1,14 +1,15 @@
 package sarama
 
 type PartitionMetadata struct {
-	Err      KError
-	ID       int32
-	Leader   int32
-	Replicas []int32
-	Isr      []int32
+	Err             KError
+	ID              int32
+	Leader          int32
+	Replicas        []int32
+	Isr             []int32
+	OfflineReplicas []int32
 }
 
-func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
+func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
@@ -35,10 +36,17 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	if version >= 5 {
+		pm.OfflineReplicas, err = pd.getInt32Array()
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
-func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
+func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(int16(pm.Err))
 	pe.putInt32(pm.ID)
 	pe.putInt32(pm.Leader)
@@ -53,6 +61,13 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
 		return err
 	}
 
+	if version >= 5 {
+		err = pe.putInt32Array(pm.OfflineReplicas)
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -89,7 +104,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
 	tm.Partitions = make([]*PartitionMetadata, n)
 	for i := 0; i < n; i++ {
 		tm.Partitions[i] = new(PartitionMetadata)
-		err = tm.Partitions[i].decode(pd)
+		err = tm.Partitions[i].decode(pd, version)
 		if err != nil {
 			return err
 		}
@@ -116,7 +131,7 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
 	}
 
 	for _, pm := range tm.Partitions {
-		err = pm.encode(pe)
+		err = pm.encode(pe, version)
 		if err != nil {
 			return err
 		}
@@ -126,13 +141,24 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
 }
 
 type MetadataResponse struct {
-	Version      int16
-	Brokers      []*Broker
-	ControllerID int32
-	Topics       []*TopicMetadata
+	Version        int16
+	ThrottleTimeMs int32
+	Brokers        []*Broker
+	ClusterID      *string
+	ControllerID   int32
+	Topics         []*TopicMetadata
 }
 
 func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -147,6 +173,13 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 		}
 	}
 
+	if version >= 2 {
+		r.ClusterID, err = pd.getNullableString()
+		if err != nil {
+			return err
+		}
+	}
+
 	if version >= 1 {
 		r.ControllerID, err = pd.getInt32()
 		if err != nil {
@@ -208,11 +241,22 @@ func (r *MetadataResponse) key() int16 {
 }
 
 func (r *MetadataResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *MetadataResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_10_0_0
+	case 2:
+		return V0_10_1_0
+	case 3, 4:
+		return V0_11_0_0
+	case 5:
+		return V1_0_0_0
+	default:
+		return MinVersion
+	}
 }
 
 // testing API

+ 74 - 4
metadata_response_test.go

@@ -75,6 +75,31 @@ var (
 		0x00, 0x03, 'b', 'a', 'r',
 		0x01,
 		0x00, 0x00, 0x00, 0x00}
+
+	noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{
+		0x00, 0x00, 0x00, 0x10,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00}
+
+	noBrokersOneTopicWithOfflineReplicasV5 = []byte{
+		0x00, 0x00, 0x00, 0x05,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
+		0x00, 0x00, 0x00, 0x02,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x03, 'f', 'o', 'o',
+		0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x04,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x07,
+		0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
+		0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02,
+		0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03,
+	}
 )
 
 func TestEmptyMetadataResponseV0(t *testing.T) {
@@ -206,15 +231,60 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
 	if response.ControllerID != 4 {
-		t.Error("Decoding produced", len(response.Brokers), "should have been 4!")
+		t.Error("Decoding produced", response.ControllerID, "should have been 4!")
 	}
 	if len(response.Topics) != 2 {
-		t.Error("Decoding produced", len(response.Brokers), "topics where there were 2!")
+		t.Error("Decoding produced", len(response.Topics), "topics where there were 2!")
 	}
 	if response.Topics[0].IsInternal {
-		t.Error("Decoding produced", response.ControllerID, "topic0 should have been false!")
+		t.Error("Decoding produced", response.Topics[0], "topic0 should have been false!")
 	}
 	if !response.Topics[1].IsInternal {
-		t.Error("Decoding produced", response.ControllerID, "topic1 should have been true!")
+		t.Error("Decoding produced", response.Topics[1], "topic1 should have been true!")
+	}
+}
+
+func TestMetadataResponseWithThrottleTime(t *testing.T) {
+	response := MetadataResponse{}
+
+	testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3)
+	if response.ThrottleTimeMs != int32(16) {
+		t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 16!")
+	}
+	if len(response.Brokers) != 0 {
+		t.Error("Decoding produced", response.Brokers, "should have been 0!")
+	}
+	if response.ControllerID != int32(1) {
+		t.Error("Decoding produced", response.ControllerID, "should have been 1!")
+	}
+	if *response.ClusterID != "clusterId" {
+		t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
+	}
+	if len(response.Topics) != 0 {
+		t.Error("Decoding produced", len(response.Topics), "should have been 0!")
+	}
+}
+
+func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) {
+	response := MetadataResponse{}
+
+	testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5)
+	if response.ThrottleTimeMs != int32(5) {
+		t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 5!")
+	}
+	if len(response.Brokers) != 0 {
+		t.Error("Decoding produced", response.Brokers, "should have been 0!")
+	}
+	if response.ControllerID != int32(2) {
+		t.Error("Decoding produced", response.ControllerID, "should have been 21!")
+	}
+	if *response.ClusterID != "clusterId" {
+		t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
+	}
+	if len(response.Topics) != 1 {
+		t.Error("Decoding produced", len(response.Topics), "should have been 1!")
+	}
+	if len(response.Topics[0].Partitions[0].OfflineReplicas) != 1 {
+		t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 1!")
 	}
 }