Browse Source

Added support for Metadata Request/Response v1

V1 enables to discover the cluster controller which is required to send CreateTopics, DeleteTopics and CreatePartitions requests

Co-authored-by: Adrian Preston <prestona@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Mickael Maison 7 years ago
parent
commit
835ee8959d
5 changed files with 194 additions and 43 deletions
  1. 9 1
      broker.go
  2. 39 19
      metadata_request.go
  3. 22 7
      metadata_request_test.go
  4. 34 7
      metadata_response.go
  5. 90 9
      metadata_response_test.go

+ 9 - 1
broker.go

@@ -18,6 +18,7 @@ import (
 type Broker struct {
 	id   int32
 	addr string
+	rack *string
 
 	conf          *Config
 	correlationID int32
@@ -592,7 +593,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
 	}
 }
 
-func (b *Broker) decode(pd packetDecoder) (err error) {
+func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
 	b.id, err = pd.getInt32()
 	if err != nil {
 		return err
@@ -608,6 +609,13 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	if version >= 1 {
+		b.rack, err = pd.getNullableString()
+		if err != nil {
+			return err
+		}
+	}
+
 	b.addr = net.JoinHostPort(host, fmt.Sprint(port))
 	if _, _, err := net.SplitHostPort(b.addr); err != nil {
 		return err

+ 39 - 19
metadata_request.go

@@ -1,42 +1,57 @@
 package sarama
 
 type MetadataRequest struct {
-	Topics []string
+	Version int16
+	Topics  []string
 }
 
 func (r *MetadataRequest) encode(pe packetEncoder) error {
-	err := pe.putArrayLength(len(r.Topics))
-	if err != nil {
-		return err
+	if r.Version < 0 || r.Version > 1 {
+		return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
 	}
-
-	for i := range r.Topics {
-		err = pe.putString(r.Topics[i])
+	if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
+		err := pe.putArrayLength(len(r.Topics))
 		if err != nil {
 			return err
 		}
+
+		for i := range r.Topics {
+			err = pe.putString(r.Topics[i])
+			if err != nil {
+				return err
+			}
+		}
+	} else {
+		pe.putInt32(-1)
 	}
 	return nil
 }
 
 func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
-	topicCount, err := pd.getArrayLength()
+	r.Version = version
+	size, err := pd.getInt32()
 	if err != nil {
 		return err
 	}
-	if topicCount == 0 {
+	if size < 0 {
 		return nil
-	}
+	} else {
+		topicCount := size
+		if topicCount == 0 {
+			return nil
+		}
 
-	r.Topics = make([]string, topicCount)
-	for i := range r.Topics {
-		topic, err := pd.getString()
-		if err != nil {
-			return err
+		r.Topics = make([]string, topicCount)
+		for i := range r.Topics {
+			topic, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			r.Topics[i] = topic
 		}
-		r.Topics[i] = topic
+		return nil
 	}
-	return nil
+
 }
 
 func (r *MetadataRequest) key() int16 {
@@ -44,9 +59,14 @@ func (r *MetadataRequest) key() int16 {
 }
 
 func (r *MetadataRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *MetadataRequest) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_10_0_0
+	default:
+		return MinVersion
+	}
 }

+ 22 - 7
metadata_request_test.go

@@ -3,27 +3,42 @@ package sarama
 import "testing"
 
 var (
-	metadataRequestNoTopics = []byte{
+	metadataRequestNoTopicsV0 = []byte{
 		0x00, 0x00, 0x00, 0x00}
 
-	metadataRequestOneTopic = []byte{
+	metadataRequestOneTopicV0 = []byte{
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x06, 't', 'o', 'p', 'i', 'c', '1'}
 
-	metadataRequestThreeTopics = []byte{
+	metadataRequestThreeTopicsV0 = []byte{
 		0x00, 0x00, 0x00, 0x03,
 		0x00, 0x03, 'f', 'o', 'o',
 		0x00, 0x03, 'b', 'a', 'r',
 		0x00, 0x03, 'b', 'a', 'z'}
+
+	metadataRequestNoTopicsV1 = []byte{
+		0xff, 0xff, 0xff, 0xff}
 )
 
-func TestMetadataRequest(t *testing.T) {
+func TestMetadataRequestV0(t *testing.T) {
+	request := new(MetadataRequest)
+	testRequest(t, "no topics", request, metadataRequestNoTopicsV0)
+
+	request.Topics = []string{"topic1"}
+	testRequest(t, "one topic", request, metadataRequestOneTopicV0)
+
+	request.Topics = []string{"foo", "bar", "baz"}
+	testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
+}
+
+func TestMetadataRequestV1(t *testing.T) {
 	request := new(MetadataRequest)
-	testRequest(t, "no topics", request, metadataRequestNoTopics)
+	request.Version = 1
+	testRequest(t, "no topics", request, metadataRequestNoTopicsV1)
 
 	request.Topics = []string{"topic1"}
-	testRequest(t, "one topic", request, metadataRequestOneTopic)
+	testRequest(t, "one topic", request, metadataRequestOneTopicV0)
 
 	request.Topics = []string{"foo", "bar", "baz"}
-	testRequest(t, "three topics", request, metadataRequestThreeTopics)
+	testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
 }

+ 34 - 7
metadata_response.go

@@ -59,10 +59,11 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
 type TopicMetadata struct {
 	Err        KError
 	Name       string
+	IsInternal bool // Only valid for Version >= 1
 	Partitions []*PartitionMetadata
 }
 
-func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
+func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
@@ -74,6 +75,13 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	if version >= 1 {
+		tm.IsInternal, err = pd.getBool()
+		if err != nil {
+			return err
+		}
+	}
+
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -90,7 +98,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
+func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(int16(tm.Err))
 
 	err = pe.putString(tm.Name)
@@ -98,6 +106,10 @@ func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
 		return err
 	}
 
+	if version >= 1 {
+		pe.putBool(tm.IsInternal)
+	}
+
 	err = pe.putArrayLength(len(tm.Partitions))
 	if err != nil {
 		return err
@@ -114,8 +126,10 @@ func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
 }
 
 type MetadataResponse struct {
-	Brokers []*Broker
-	Topics  []*TopicMetadata
+	Version      int16
+	Brokers      []*Broker
+	ControllerID int32
+	Topics       []*TopicMetadata
 }
 
 func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -127,12 +141,21 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	r.Brokers = make([]*Broker, n)
 	for i := 0; i < n; i++ {
 		r.Brokers[i] = new(Broker)
-		err = r.Brokers[i].decode(pd)
+		err = r.Brokers[i].decode(pd, version)
 		if err != nil {
 			return err
 		}
 	}
 
+	if version >= 1 {
+		r.ControllerID, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	} else {
+		r.ControllerID = -1
+	}
+
 	n, err = pd.getArrayLength()
 	if err != nil {
 		return err
@@ -141,7 +164,7 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 	r.Topics = make([]*TopicMetadata, n)
 	for i := 0; i < n; i++ {
 		r.Topics[i] = new(TopicMetadata)
-		err = r.Topics[i].decode(pd)
+		err = r.Topics[i].decode(pd, version)
 		if err != nil {
 			return err
 		}
@@ -162,12 +185,16 @@ func (r *MetadataResponse) encode(pe packetEncoder) error {
 		}
 	}
 
+	if r.Version >= 1 {
+		pe.putInt32(r.ControllerID)
+	}
+
 	err = pe.putArrayLength(len(r.Topics))
 	if err != nil {
 		return err
 	}
 	for _, tm := range r.Topics {
-		err = tm.encode(pe)
+		err = tm.encode(pe, r.Version)
 		if err != nil {
 			return err
 		}

+ 90 - 9
metadata_response_test.go

@@ -3,11 +3,11 @@ package sarama
 import "testing"
 
 var (
-	emptyMetadataResponse = []byte{
+	emptyMetadataResponseV0 = []byte{
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
 
-	brokersNoTopicsMetadataResponse = []byte{
+	brokersNoTopicsMetadataResponseV0 = []byte{
 		0x00, 0x00, 0x00, 0x02,
 
 		0x00, 0x00, 0xab, 0xff,
@@ -20,7 +20,7 @@ var (
 
 		0x00, 0x00, 0x00, 0x00}
 
-	topicsNoBrokersMetadataResponse = []byte{
+	topicsNoBrokersMetadataResponseV0 = []byte{
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x02,
 
@@ -36,12 +36,51 @@ var (
 		0x00, 0x00,
 		0x00, 0x03, 'b', 'a', 'r',
 		0x00, 0x00, 0x00, 0x00}
+
+	brokersNoTopicsMetadataResponseV1 = []byte{
+		0x00, 0x00, 0x00, 0x02,
+
+		0x00, 0x00, 0xab, 0xff,
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
+		0x00, 0x00, 0x00, 0x33,
+		0x00, 0x05, 'r', 'a', 'c', 'k', '0',
+
+		0x00, 0x01, 0x02, 0x03,
+		0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm',
+		0x00, 0x00, 0x01, 0x11,
+		0x00, 0x05, 'r', 'a', 'c', 'k', '1',
+
+		0x00, 0x00, 0x00, 0x01,
+
+		0x00, 0x00, 0x00, 0x00}
+
+	topicsNoBrokersMetadataResponseV1 = []byte{
+		0x00, 0x00, 0x00, 0x00,
+
+		0x00, 0x00, 0x00, 0x04,
+
+		0x00, 0x00, 0x00, 0x02,
+
+		0x00, 0x00,
+		0x00, 0x03, 'f', 'o', 'o',
+		0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x04,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x07,
+		0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
+		0x00, 0x00, 0x00, 0x00,
+
+		0x00, 0x00,
+		0x00, 0x03, 'b', 'a', 'r',
+		0x01,
+		0x00, 0x00, 0x00, 0x00}
 )
 
-func TestEmptyMetadataResponse(t *testing.T) {
+func TestEmptyMetadataResponseV0(t *testing.T) {
 	response := MetadataResponse{}
 
-	testVersionDecodable(t, "empty", &response, emptyMetadataResponse, 0)
+	testVersionDecodable(t, "empty, V0", &response, emptyMetadataResponseV0, 0)
 	if len(response.Brokers) != 0 {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
@@ -50,10 +89,10 @@ func TestEmptyMetadataResponse(t *testing.T) {
 	}
 }
 
-func TestMetadataResponseWithBrokers(t *testing.T) {
+func TestMetadataResponseWithBrokersV0(t *testing.T) {
 	response := MetadataResponse{}
 
-	testVersionDecodable(t, "brokers, no topics", &response, brokersNoTopicsMetadataResponse, 0)
+	testVersionDecodable(t, "brokers, no topics, V0", &response, brokersNoTopicsMetadataResponseV0, 0)
 	if len(response.Brokers) != 2 {
 		t.Fatal("Decoding produced", len(response.Brokers), "brokers where there were two!")
 	}
@@ -76,10 +115,10 @@ func TestMetadataResponseWithBrokers(t *testing.T) {
 	}
 }
 
-func TestMetadataResponseWithTopics(t *testing.T) {
+func TestMetadataResponseWithTopicsV0(t *testing.T) {
 	response := MetadataResponse{}
 
-	testVersionDecodable(t, "topics, no brokers", &response, topicsNoBrokersMetadataResponse, 0)
+	testVersionDecodable(t, "topics, no brokers, V0", &response, topicsNoBrokersMetadataResponseV0, 0)
 	if len(response.Brokers) != 0 {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
@@ -137,3 +176,45 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Error("Decoding produced invalid partition count for topic 1.")
 	}
 }
+
+func TestMetadataResponseWithBrokersV1(t *testing.T) {
+	response := MetadataResponse{}
+
+	testVersionDecodable(t, "topics, V1", &response, brokersNoTopicsMetadataResponseV1, 1)
+	if len(response.Brokers) != 2 {
+		t.Error("Decoding produced", len(response.Brokers), "brokers where there were 2!")
+	}
+	if response.Brokers[0].rack == nil || *response.Brokers[0].rack != "rack0" {
+		t.Error("Decoding produced invalid broker 0 rack.")
+	}
+	if response.Brokers[1].rack == nil || *response.Brokers[1].rack != "rack1" {
+		t.Error("Decoding produced invalid broker 1 rack.")
+	}
+	if response.ControllerID != 1 {
+		t.Error("Decoding produced", response.ControllerID, "should have been 1!")
+	}
+	if len(response.Topics) != 0 {
+		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
+	}
+}
+
+func TestMetadataResponseWithTopicsV1(t *testing.T) {
+	response := MetadataResponse{}
+
+	testVersionDecodable(t, "topics, V1", &response, topicsNoBrokersMetadataResponseV1, 1)
+	if len(response.Brokers) != 0 {
+		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
+	}
+	if response.ControllerID != 4 {
+		t.Error("Decoding produced", len(response.Brokers), "should have been 4!")
+	}
+	if len(response.Topics) != 2 {
+		t.Error("Decoding produced", len(response.Brokers), "topics where there were 2!")
+	}
+	if response.Topics[0].IsInternal {
+		t.Error("Decoding produced", response.ControllerID, "topic0 should have been false!")
+	}
+	if !response.Topics[1].IsInternal {
+		t.Error("Decoding produced", response.ControllerID, "topic1 should have been true!")
+	}
+}