فهرست منبع

Implement DescribeConfigsResponse 1 & 2

Conor Mongey 7 سال پیش
والد
کامیت
f9494b44e1
2فایلهای تغییر یافته به همراه260 افزوده شده و 17 حذف شده
  1. 145 13
      describe_configs_response.go
  2. 115 4
      describe_configs_response_test.go

+ 145 - 13
describe_configs_response.go

@@ -1,8 +1,41 @@
 package sarama
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
+
+type ConfigSource int8
+
+func (s ConfigSource) String() string {
+	switch s {
+	case SourceUnknown:
+		return "Unknown"
+	case SourceTopic:
+		return "Topic"
+	case SourceDynamicBroker:
+		return "DynamicBroker"
+	case SourceDynamicDefaultBroker:
+		return "DynamicDefaultBroker"
+	case SourceStaticBroker:
+		return "StaticBroker"
+	case SourceDefault:
+		return "Default"
+	}
+	return fmt.Sprintf("Source Invalid: %d", int(s))
+}
+
+const (
+	SourceUnknown              ConfigSource = 0
+	SourceTopic                ConfigSource = 1
+	SourceDynamicBroker        ConfigSource = 2
+	SourceDynamicDefaultBroker ConfigSource = 3
+	SourceStaticBroker         ConfigSource = 4
+	SourceDefault              ConfigSource = 5
+)
 
 type DescribeConfigsResponse struct {
+	Version      int16
 	ThrottleTime time.Duration
 	Resources    []*ResourceResponse
 }
@@ -20,7 +53,15 @@ type ConfigEntry struct {
 	Value     string
 	ReadOnly  bool
 	Default   bool
+	Source    ConfigSource
 	Sensitive bool
+	Synonyms  []*ConfigSynonym
+}
+
+type ConfigSynonym struct {
+	ConfigName  string
+	ConfigValue string
+	Source      ConfigSource
 }
 
 func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
@@ -30,14 +71,16 @@ func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
 	}
 
 	for _, c := range r.Resources {
-		if err = c.encode(pe); err != nil {
+		if err = c.encode(pe, r.Version); err != nil {
 			return err
 		}
 	}
+
 	return nil
 }
 
 func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 	throttleTime, err := pd.getInt32()
 	if err != nil {
 		return err
@@ -66,14 +109,21 @@ func (r *DescribeConfigsResponse) key() int16 {
 }
 
 func (r *DescribeConfigsResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch r.Version {
+	case 1:
+		return V1_0_0_0
+	case 2:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }
 
-func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
+func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(r.ErrorCode)
 
 	if err = pe.putString(r.ErrorMsg); err != nil {
@@ -91,7 +141,7 @@ func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
 	}
 
 	for _, c := range r.Configs {
-		if err = c.encode(pe); err != nil {
+		if err = c.encode(pe, version); err != nil {
 			return err
 		}
 	}
@@ -139,7 +189,7 @@ func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
-func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
+func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
 	if err = pe.putString(r.Name); err != nil {
 		return err
 	}
@@ -149,12 +199,32 @@ func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
 	}
 
 	pe.putBool(r.ReadOnly)
-	pe.putBool(r.Default)
-	pe.putBool(r.Sensitive)
+
+	if version <= 0 {
+		pe.putBool(r.Default)
+		pe.putBool(r.Sensitive)
+	} else {
+		pe.putInt8(int8(r.Source))
+		pe.putBool(r.Sensitive)
+
+		if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
+			return err
+		}
+		for _, c := range r.Synonyms {
+			if err = c.encode(pe, version); err != nil {
+				return err
+			}
+		}
+	}
+
 	return nil
 }
 
+//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
 func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
+	if version == 0 {
+		r.Source = SourceUnknown
+	}
 	name, err := pd.getString()
 	if err != nil {
 		return err
@@ -173,16 +243,78 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
 	}
 	r.ReadOnly = read
 
-	de, err := pd.getBool()
-	if err != nil {
-		return err
+	if version == 0 {
+		defaultB, err := pd.getBool()
+		if err != nil {
+			return err
+		}
+		r.Default = defaultB
+	} else {
+		source, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		r.Source = ConfigSource(source)
 	}
-	r.Default = de
 
 	sensitive, err := pd.getBool()
 	if err != nil {
 		return err
 	}
 	r.Sensitive = sensitive
+
+	if version > 0 {
+		n, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+		r.Synonyms = make([]*ConfigSynonym, n)
+
+		for i := 0; i < n; i++ {
+			s := &ConfigSynonym{}
+			if err := s.decode(pd, version); err != nil {
+				return err
+			}
+			r.Synonyms[i] = s
+		}
+
+	}
+	return nil
+}
+
+func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
+	err = pe.putString(c.ConfigName)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putString(c.ConfigValue)
+	if err != nil {
+		return err
+	}
+
+	pe.putInt8(int8(c.Source))
+
+	return nil
+}
+
+func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
+	name, err := pd.getString()
+	if err != nil {
+		return nil
+	}
+	c.ConfigName = name
+
+	value, err := pd.getString()
+	if err != nil {
+		return nil
+	}
+	c.ConfigValue = value
+
+	source, err := pd.getInt8()
+	if err != nil {
+		return nil
+	}
+	c.Source = ConfigSource(source)
 	return nil
 }

+ 115 - 4
describe_configs_response_test.go

@@ -10,7 +10,7 @@ var (
 		0, 0, 0, 0, // no configs
 	}
 
-	describeConfigsResponsePopulated = []byte{
+	describeConfigsResponsePopulatedv0 = []byte{
 		0, 0, 0, 0, //throttle
 		0, 0, 0, 1, // response
 		0, 0, //errorcode
@@ -24,9 +24,44 @@ var (
 		0, // Default
 		0, // Sensitive
 	}
+
+	describeConfigsResponsePopulatedv1 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0,          // ReadOnly
+		4,          // Source
+		0,          // Sensitive
+		0, 0, 0, 0, // No Synonym
+	}
+
+	describeConfigsResponseWithSynonymv1 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0,          // ReadOnly
+		4,          // Source
+		0,          // Sensitive
+		0, 0, 0, 1, // 1 Synonym
+		0, 14, 'l', 'o', 'g', '.', 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		4, // Source
+	}
 )
 
-func TestDescribeConfigsResponse(t *testing.T) {
+func TestDescribeConfigsResponsev0(t *testing.T) {
 	var response *DescribeConfigsResponse
 
 	response = &DescribeConfigsResponse{
@@ -38,7 +73,7 @@ func TestDescribeConfigsResponse(t *testing.T) {
 	}
 
 	response = &DescribeConfigsResponse{
-		Resources: []*ResourceResponse{
+		Version: 0, Resources: []*ResourceResponse{
 			&ResourceResponse{
 				ErrorCode: 0,
 				ErrorMsg:  "",
@@ -56,5 +91,81 @@ func TestDescribeConfigsResponse(t *testing.T) {
 			},
 		},
 	}
-	testResponse(t, "response with error", response, describeConfigsResponsePopulated)
+	testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0)
+}
+
+func TestDescribeConfigsResponsev1(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 1,
+		Resources: []*ResourceResponse{
+			&ResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					&ConfigEntry{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Source:    SourceStaticBroker,
+						Sensitive: false,
+						Synonyms:  []*ConfigSynonym{},
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponsePopulatedv1)
+}
+
+func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 1,
+		Resources: []*ResourceResponse{
+			&ResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					&ConfigEntry{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Source:    SourceStaticBroker,
+						Sensitive: false,
+						Synonyms: []*ConfigSynonym{
+							{
+								ConfigName:  "log.segment.ms",
+								ConfigValue: "1000",
+								Source:      SourceStaticBroker,
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1)
 }