Browse Source

Add Describe + AlterConfigs (#1014)

Add DescribeConfig and AlterConfig
Conor Mongey 7 years ago
parent
commit
f7466ea37d

+ 120 - 0
alter_configs_request.go

@@ -0,0 +1,120 @@
+package sarama
+
+type AlterConfigsRequest struct {
+	Resources    []*AlterConfigsResource
+	ValidateOnly bool
+}
+
+type AlterConfigsResource struct {
+	Type          ConfigResourceType
+	Name          string
+	ConfigEntries map[string]*string
+}
+
+func (acr *AlterConfigsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(acr.Resources)); err != nil {
+		return err
+	}
+
+	for _, r := range acr.Resources {
+		if err := r.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	pe.putBool(acr.ValidateOnly)
+	return nil
+}
+
+func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
+	resourceCount, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	acr.Resources = make([]*AlterConfigsResource, resourceCount)
+	for i := range acr.Resources {
+		r := &AlterConfigsResource{}
+		err = r.decode(pd, version)
+		if err != nil {
+			return err
+		}
+		acr.Resources[i] = r
+	}
+
+	validateOnly, err := pd.getBool()
+	if err != nil {
+		return err
+	}
+
+	acr.ValidateOnly = validateOnly
+
+	return nil
+}
+
+func (ac *AlterConfigsResource) encode(pe packetEncoder) error {
+	pe.putInt8(int8(ac.Type))
+
+	if err := pe.putString(ac.Name); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil {
+		return err
+	}
+	for configKey, configValue := range ac.ConfigEntries {
+		if err := pe.putString(configKey); err != nil {
+			return err
+		}
+		if err := pe.putNullableString(configValue); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
+	t, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	ac.Type = ConfigResourceType(t)
+
+	name, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	ac.Name = name
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		ac.ConfigEntries = make(map[string]*string, n)
+		for i := 0; i < n; i++ {
+			configKey, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			if ac.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
+				return err
+			}
+		}
+	}
+	return err
+}
+
+func (acr *AlterConfigsRequest) key() int16 {
+	return 33
+}
+
+func (acr *AlterConfigsRequest) version() int16 {
+	return 0
+}
+
+func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 91 - 0
alter_configs_request_test.go

@@ -0,0 +1,91 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyAlterConfigsRequest = []byte{
+		0, 0, 0, 0, // 0 configs
+		0, // don't Validate
+	}
+
+	singleAlterConfigsRequest = []byte{
+		0, 0, 0, 1, // 1 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		0, 0, 0, 1, //1 config name
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4,
+		'1', '0', '0', '0',
+		0, // don't validate
+	}
+
+	doubleAlterConfigsRequest = []byte{
+		0, 0, 0, 2, // 2 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		0, 0, 0, 1, //1 config name
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4,
+		'1', '0', '0', '0',
+		2,                   // a topic
+		0, 3, 'b', 'a', 'r', // topic name: foo
+		0, 0, 0, 2, //2 config
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4,
+		'1', '0', '0', '0',
+		0, 12, // 12 chars
+		'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
+		0, 4,
+		'1', '0', '0', '0',
+		0, // don't validate
+	}
+)
+
+func TestAlterConfigsRequest(t *testing.T) {
+	var request *AlterConfigsRequest
+
+	request = &AlterConfigsRequest{
+		Resources: []*AlterConfigsResource{},
+	}
+	testRequest(t, "no requests", request, emptyAlterConfigsRequest)
+
+	configValue := "1000"
+	request = &AlterConfigsRequest{
+		Resources: []*AlterConfigsResource{
+			&AlterConfigsResource{
+				Type: TopicResource,
+				Name: "foo",
+				ConfigEntries: map[string]*string{
+					"segment.ms": &configValue,
+				},
+			},
+		},
+	}
+
+	testRequest(t, "one config", request, singleAlterConfigsRequest)
+
+	request = &AlterConfigsRequest{
+		Resources: []*AlterConfigsResource{
+			&AlterConfigsResource{
+				Type: TopicResource,
+				Name: "foo",
+				ConfigEntries: map[string]*string{
+					"segment.ms": &configValue,
+				},
+			},
+			&AlterConfigsResource{
+				Type: TopicResource,
+				Name: "bar",
+				ConfigEntries: map[string]*string{
+					"segment.ms":   &configValue,
+					"retention.ms": &configValue,
+				},
+			},
+		},
+	}
+
+	testRequest(t, "two configs", request, doubleAlterConfigsRequest)
+}

+ 95 - 0
alter_configs_response.go

@@ -0,0 +1,95 @@
+package sarama
+
+import "time"
+
+type AlterConfigsResponse struct {
+	ThrottleTime time.Duration
+	Resources    []*AlterConfigsResourceResponse
+}
+
+type AlterConfigsResourceResponse struct {
+	ErrorCode int16
+	ErrorMsg  string
+	Type      ConfigResourceType
+	Name      string
+}
+
+func (ct *AlterConfigsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(ct.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(ct.Resources)); err != nil {
+		return err
+	}
+
+	for i := range ct.Resources {
+		pe.putInt16(ct.Resources[i].ErrorCode)
+		err := pe.putString(ct.Resources[i].ErrorMsg)
+		if err != nil {
+			return nil
+		}
+		pe.putInt8(int8(ct.Resources[i].Type))
+		err = pe.putString(ct.Resources[i].Name)
+		if err != nil {
+			return nil
+		}
+	}
+
+	return nil
+}
+
+func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	responseCount, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	acr.Resources = make([]*AlterConfigsResourceResponse, responseCount)
+
+	for i := range acr.Resources {
+		acr.Resources[i] = new(AlterConfigsResourceResponse)
+
+		errCode, err := pd.getInt16()
+		if err != nil {
+			return err
+		}
+		acr.Resources[i].ErrorCode = errCode
+
+		e, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		acr.Resources[i].ErrorMsg = e
+
+		t, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		acr.Resources[i].Type = ConfigResourceType(t)
+
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		acr.Resources[i].Name = name
+	}
+
+	return nil
+}
+
+func (r *AlterConfigsResponse) key() int16 {
+	return 32
+}
+
+func (r *AlterConfigsResponse) version() int16 {
+	return 0
+}
+
+func (r *AlterConfigsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 45 - 0
alter_configs_response_test.go

@@ -0,0 +1,45 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	alterResponseEmpty = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 0, // no configs
+	}
+
+	alterResponsePopulated = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+	}
+)
+
+func TestAlterConfigsResponse(t *testing.T) {
+	var response *AlterConfigsResponse
+
+	response = &AlterConfigsResponse{
+		Resources: []*AlterConfigsResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, alterResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &AlterConfigsResponse{
+		Resources: []*AlterConfigsResourceResponse{
+			&AlterConfigsResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+			},
+		},
+	}
+	testResponse(t, "response with error", response, alterResponsePopulated)
+}

+ 21 - 0
broker.go

@@ -483,6 +483,27 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom
 	return response, nil
 }
 
+func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
+	response := new(DescribeConfigsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
+	response := new(AlterConfigsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 15 - 0
config_resource_type.go

@@ -0,0 +1,15 @@
+package sarama
+
+type ConfigResourceType int8
+
+// Taken from :
+// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
+
+const (
+	UnknownResource ConfigResourceType = 0
+	AnyResource     ConfigResourceType = 1
+	TopicResource   ConfigResourceType = 2
+	GroupResource   ConfigResourceType = 3
+	ClusterResource ConfigResourceType = 4
+	BrokerResource  ConfigResourceType = 5
+)

+ 91 - 0
describe_configs_request.go

@@ -0,0 +1,91 @@
+package sarama
+
+type ConfigResource struct {
+	Type        ConfigResourceType
+	Name        string
+	ConfigNames []string
+}
+
+type DescribeConfigsRequest struct {
+	Resources []*ConfigResource
+}
+
+func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(r.Resources)); err != nil {
+		return err
+	}
+
+	for _, c := range r.Resources {
+		pe.putInt8(int8(c.Type))
+		if err := pe.putString(c.Name); err != nil {
+			return err
+		}
+
+		if len(c.ConfigNames) == 0 {
+			pe.putInt32(-1)
+			continue
+		}
+		if err := pe.putStringArray(c.ConfigNames); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Resources = make([]*ConfigResource, n)
+
+	for i := 0; i < n; i++ {
+		r.Resources[i] = &ConfigResource{}
+		t, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		r.Resources[i].Type = ConfigResourceType(t)
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		r.Resources[i].Name = name
+
+		confLength, err := pd.getArrayLength()
+
+		if err != nil {
+			return err
+		}
+
+		if confLength == -1 {
+			continue
+		}
+
+		cfnames := make([]string, confLength)
+		for i := 0; i < confLength; i++ {
+			s, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			cfnames[i] = s
+		}
+		r.Resources[i].ConfigNames = cfnames
+	}
+
+	return nil
+}
+
+func (r *DescribeConfigsRequest) key() int16 {
+	return 32
+}
+
+func (r *DescribeConfigsRequest) version() int16 {
+	return 0
+}
+
+func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 90 - 0
describe_configs_request_test.go

@@ -0,0 +1,90 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyDescribeConfigsRequest = []byte{
+		0, 0, 0, 0, // 0 configs
+	}
+
+	singleDescribeConfigsRequest = []byte{
+		0, 0, 0, 1, // 1 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		0, 0, 0, 1, //1 config name
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+	}
+
+	doubleDescribeConfigsRequest = []byte{
+		0, 0, 0, 2, // 2 configs
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		0, 0, 0, 2, //2 config name
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 12, // 12 chars
+		'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
+		2,                   // a topic
+		0, 3, 'b', 'a', 'r', // topic name: foo
+		0, 0, 0, 1, // 1 config
+		0, 10, // 10 chars
+		's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+	}
+
+	singleDescribeConfigsRequestAllConfigs = []byte{
+		0, 0, 0, 1, // 1 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		255, 255, 255, 255, // no configs
+	}
+)
+
+func TestDescribeConfigsRequest(t *testing.T) {
+	var request *DescribeConfigsRequest
+
+	request = &DescribeConfigsRequest{
+		Resources: []*ConfigResource{},
+	}
+	testRequest(t, "no requests", request, emptyDescribeConfigsRequest)
+
+	configs := []string{"segment.ms"}
+	request = &DescribeConfigsRequest{
+		Resources: []*ConfigResource{
+			&ConfigResource{
+				Type:        TopicResource,
+				Name:        "foo",
+				ConfigNames: configs,
+			},
+		},
+	}
+
+	testRequest(t, "one config", request, singleDescribeConfigsRequest)
+
+	request = &DescribeConfigsRequest{
+		Resources: []*ConfigResource{
+			&ConfigResource{
+				Type:        TopicResource,
+				Name:        "foo",
+				ConfigNames: []string{"segment.ms", "retention.ms"},
+			},
+			&ConfigResource{
+				Type:        TopicResource,
+				Name:        "bar",
+				ConfigNames: []string{"segment.ms"},
+			},
+		},
+	}
+	testRequest(t, "two configs", request, doubleDescribeConfigsRequest)
+
+	request = &DescribeConfigsRequest{
+		Resources: []*ConfigResource{
+			&ConfigResource{
+				Type: TopicResource,
+				Name: "foo",
+			},
+		},
+	}
+
+	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs)
+}

+ 188 - 0
describe_configs_response.go

@@ -0,0 +1,188 @@
+package sarama
+
+import "time"
+
+type DescribeConfigsResponse struct {
+	ThrottleTime time.Duration
+	Resources    []*ResourceResponse
+}
+
+type ResourceResponse struct {
+	ErrorCode int16
+	ErrorMsg  string
+	Type      ConfigResourceType
+	Name      string
+	Configs   []*ConfigEntry
+}
+
+type ConfigEntry struct {
+	Name      string
+	Value     string
+	ReadOnly  bool
+	Default   bool
+	Sensitive bool
+}
+
+func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
+	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
+	if err = pe.putArrayLength(len(r.Resources)); err != nil {
+		return err
+	}
+
+	for _, c := range r.Resources {
+		if err = c.encode(pe); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Resources = make([]*ResourceResponse, n)
+	for i := 0; i < n; i++ {
+		rr := &ResourceResponse{}
+		if err := rr.decode(pd, version); err != nil {
+			return err
+		}
+		r.Resources[i] = rr
+	}
+
+	return nil
+}
+
+func (r *DescribeConfigsResponse) key() int16 {
+	return 32
+}
+
+func (r *DescribeConfigsResponse) version() int16 {
+	return 0
+}
+
+func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
+	pe.putInt16(r.ErrorCode)
+
+	if err = pe.putString(r.ErrorMsg); err != nil {
+		return err
+	}
+
+	pe.putInt8(int8(r.Type))
+
+	if err = pe.putString(r.Name); err != nil {
+		return err
+	}
+
+	if err = pe.putArrayLength(len(r.Configs)); err != nil {
+		return err
+	}
+
+	for _, c := range r.Configs {
+		if err = c.encode(pe); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
+	ec, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	r.ErrorCode = ec
+
+	em, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.ErrorMsg = em
+
+	t, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	r.Type = ConfigResourceType(t)
+
+	name, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.Name = name
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Configs = make([]*ConfigEntry, n)
+	for i := 0; i < n; i++ {
+		c := &ConfigEntry{}
+		if err := c.decode(pd, version); err != nil {
+			return err
+		}
+		r.Configs[i] = c
+	}
+	return nil
+}
+
+func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
+	if err = pe.putString(r.Name); err != nil {
+		return err
+	}
+
+	if err = pe.putString(r.Value); err != nil {
+		return err
+	}
+
+	pe.putBool(r.ReadOnly)
+	pe.putBool(r.Default)
+	pe.putBool(r.Sensitive)
+	return nil
+}
+
+func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
+	name, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.Name = name
+
+	value, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.Value = value
+
+	read, err := pd.getBool()
+	if err != nil {
+		return err
+	}
+	r.ReadOnly = read
+
+	de, err := pd.getBool()
+	if err != nil {
+		return err
+	}
+	r.Default = de
+
+	sensitive, err := pd.getBool()
+	if err != nil {
+		return err
+	}
+	r.Sensitive = sensitive
+	return nil
+}

+ 60 - 0
describe_configs_response_test.go

@@ -0,0 +1,60 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	describeConfigsResponseEmpty = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 0, // no configs
+	}
+
+	describeConfigsResponsePopulated = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0, // ReadOnly
+		0, // Default
+		0, // Sensitive
+	}
+)
+
+func TestDescribeConfigsResponse(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{
+			&ResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					&ConfigEntry{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Default:   false,
+						Sensitive: false,
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponsePopulated)
+}

+ 4 - 0
request.go

@@ -134,6 +134,10 @@ func allocateBody(key, version int16) protocolBody {
 		return &CreateAclsRequest{}
 	case 31:
 		return &DeleteAclsRequest{}
+	case 32:
+		return &DescribeConfigsRequest{}
+	case 33:
+		return &AlterConfigsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
 	}