Преглед на файлове

add Describe/Create/DeleteAcls

Robin преди 6 години
родител
ревизия
86d6f390cf

+ 119 - 0
acl_bindings.go

@@ -0,0 +1,119 @@
+package sarama
+
+type Resource struct {
+	ResourceType AclResourceType
+	ResourceName string
+}
+
+func (r *Resource) encode(pe packetEncoder) error {
+	pe.putInt8(int8(r.ResourceType))
+
+	if err := pe.putString(r.ResourceName); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
+	resourceType, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	r.ResourceType = AclResourceType(resourceType)
+
+	if r.ResourceName, err = pd.getString(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+type Acl struct {
+	Principal      string
+	Host           string
+	Operation      AclOperation
+	PermissionType AclPermissionType
+}
+
+func (a *Acl) encode(pe packetEncoder) error {
+	if err := pe.putString(a.Principal); err != nil {
+		return err
+	}
+
+	if err := pe.putString(a.Host); err != nil {
+		return err
+	}
+
+	pe.putInt8(int8(a.Operation))
+	pe.putInt8(int8(a.PermissionType))
+
+	return nil
+}
+
+func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
+	if a.Principal, err = pd.getString(); err != nil {
+		return err
+	}
+
+	if a.Host, err = pd.getString(); err != nil {
+		return err
+	}
+
+	operation, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	a.Operation = AclOperation(operation)
+
+	permissionType, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	a.PermissionType = AclPermissionType(permissionType)
+
+	return nil
+}
+
+type ResourceAcls struct {
+	Resource
+	Acls []*Acl
+}
+
+func (r *ResourceAcls) encode(pe packetEncoder) error {
+	if err := r.Resource.encode(pe); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.Acls)); err != nil {
+		return err
+	}
+	for _, acl := range r.Acls {
+		if err := acl.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *ResourceAcls) decode(pd packetDecoder, version int16) error {
+	if err := r.Resource.decode(pd, version); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Acls = make([]*Acl, n)
+	for i := 0; i < n; i++ {
+		r.Acls[i] = new(Acl)
+		if err := r.Acls[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 76 - 0
acl_create_request.go

@@ -0,0 +1,76 @@
+package sarama
+
+type CreateAclsRequest struct {
+	AclCreations []*AclCreation
+}
+
+func (c *CreateAclsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(c.AclCreations)); err != nil {
+		return err
+	}
+
+	for _, aclCreation := range c.AclCreations {
+		if err := aclCreation.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	c.AclCreations = make([]*AclCreation, n)
+
+	for i := 0; i < n; i++ {
+		c.AclCreations[i] = new(AclCreation)
+		if err := c.AclCreations[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *CreateAclsRequest) key() int16 {
+	return 30
+}
+
+func (d *CreateAclsRequest) version() int16 {
+	return 0
+}
+
+func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type AclCreation struct {
+	Resource
+	Acl
+}
+
+func (a *AclCreation) encode(pe packetEncoder) error {
+	if err := a.Resource.encode(pe); err != nil {
+		return err
+	}
+	if err := a.Acl.encode(pe); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) {
+	if err := a.Resource.decode(pd, version); err != nil {
+		return err
+	}
+	if err := a.Acl.decode(pd, version); err != nil {
+		return err
+	}
+
+	return nil
+}

+ 34 - 0
acl_create_request_test.go

@@ -0,0 +1,34 @@
+package sarama
+
+import "testing"
+
+var (
+	aclCreateRequest = []byte{
+		0, 0, 0, 1,
+		3, // resource type = group
+		0, 5, 'g', 'r', 'o', 'u', 'p',
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		2, // all
+		2, // deny
+	}
+)
+
+func TestCreateAclsRequest(t *testing.T) {
+	req := &CreateAclsRequest{
+		AclCreations: []*AclCreation{{
+			Resource: Resource{
+				ResourceType: AclResourceGroup,
+				ResourceName: "group",
+			},
+			Acl: Acl{
+				Principal:      "principal",
+				Host:           "host",
+				Operation:      AclOperationAll,
+				PermissionType: AclPermissionDeny,
+			}},
+		},
+	}
+
+	testRequest(t, "create request", req, aclCreateRequest)
+}

+ 88 - 0
acl_create_response.go

@@ -0,0 +1,88 @@
+package sarama
+
+import "time"
+
+type CreateAclsResponse struct {
+	ThrottleTime         time.Duration
+	AclCreationResponses []*AclCreationResponse
+}
+
+func (c *CreateAclsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil {
+		return err
+	}
+
+	for _, aclCreationResponse := range c.AclCreationResponses {
+		if err := aclCreationResponse.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	c.AclCreationResponses = make([]*AclCreationResponse, n)
+	for i := 0; i < n; i++ {
+		c.AclCreationResponses[i] = new(AclCreationResponse)
+		if err := c.AclCreationResponses[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *CreateAclsResponse) key() int16 {
+	return 30
+}
+
+func (d *CreateAclsResponse) version() int16 {
+	return 0
+}
+
+func (d *CreateAclsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type AclCreationResponse struct {
+	Err    KError
+	ErrMsg *string
+}
+
+func (a *AclCreationResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(a.Err))
+
+	if err := pe.putNullableString(a.ErrMsg); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) {
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	a.Err = KError(kerr)
+
+	if a.ErrMsg, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	return nil
+}

+ 41 - 0
acl_create_response_test.go

@@ -0,0 +1,41 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	createResponseWithError = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1,
+		0, 42,
+		0, 5, 'e', 'r', 'r', 'o', 'r',
+	}
+
+	createResponseArray = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 2,
+		0, 42,
+		0, 5, 'e', 'r', 'r', 'o', 'r',
+		0, 0,
+		255, 255,
+	}
+)
+
+func TestCreateAclsResponse(t *testing.T) {
+	errmsg := "error"
+	resp := &CreateAclsResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		AclCreationResponses: []*AclCreationResponse{{
+			Err:    ErrInvalidRequest,
+			ErrMsg: &errmsg,
+		}},
+	}
+
+	testResponse(t, "response with error", resp, createResponseWithError)
+
+	resp.AclCreationResponses = append(resp.AclCreationResponses, new(AclCreationResponse))
+
+	testResponse(t, "response array", resp, createResponseArray)
+}

+ 48 - 0
acl_delete_request.go

@@ -0,0 +1,48 @@
+package sarama
+
+type DeleteAclsRequest struct {
+	Filters []*AclFilter
+}
+
+func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(d.Filters)); err != nil {
+		return err
+	}
+
+	for _, filter := range d.Filters {
+		if err := filter.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	d.Filters = make([]*AclFilter, n)
+	for i := 0; i < n; i++ {
+		d.Filters[i] = new(AclFilter)
+		if err := d.Filters[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *DeleteAclsRequest) key() int16 {
+	return 31
+}
+
+func (d *DeleteAclsRequest) version() int16 {
+	return 0
+}
+
+func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 69 - 0
acl_delete_request_test.go

@@ -0,0 +1,69 @@
+package sarama
+
+import "testing"
+
+var (
+	aclDeleteRequestNulls = []byte{
+		0, 0, 0, 1,
+		1,
+		255, 255,
+		255, 255,
+		255, 255,
+		11,
+		3,
+	}
+
+	aclDeleteRequest = []byte{
+		0, 0, 0, 1,
+		1, // any
+		0, 6, 'f', 'i', 'l', 't', 'e', 'r',
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		4, // write
+		3, // allow
+	}
+
+	aclDeleteRequestArray = []byte{
+		0, 0, 0, 2,
+		1,
+		0, 6, 'f', 'i', 'l', 't', 'e', 'r',
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		4, // write
+		3, // allow
+		2,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		255, 255,
+		255, 255,
+		6,
+		2,
+	}
+)
+
+func TestDeleteAclsRequest(t *testing.T) {
+	req := &DeleteAclsRequest{
+		Filters: []*AclFilter{{
+			ResourceType:   AclResourceAny,
+			Operation:      AclOperationAlterConfigs,
+			PermissionType: AclPermissionAllow,
+		}},
+	}
+
+	testRequest(t, "delete request nulls", req, aclDeleteRequestNulls)
+
+	req.Filters[0].ResourceName = nullString("filter")
+	req.Filters[0].Principal = nullString("principal")
+	req.Filters[0].Host = nullString("host")
+	req.Filters[0].Operation = AclOperationWrite
+
+	testRequest(t, "delete request", req, aclDeleteRequest)
+
+	req.Filters = append(req.Filters, &AclFilter{
+		ResourceType:   AclResourceTopic,
+		ResourceName:   nullString("topic"),
+		Operation:      AclOperationDelete,
+		PermissionType: AclPermissionDeny,
+	})
+
+	testRequest(t, "delete request array", req, aclDeleteRequestArray)
+}

+ 155 - 0
acl_delete_response.go

@@ -0,0 +1,155 @@
+package sarama
+
+import "time"
+
+type DeleteAclsResponse struct {
+	ThrottleTime    time.Duration
+	FilterResponses []*FilterResponse
+}
+
+func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(a.FilterResponses)); err != nil {
+		return err
+	}
+
+	for _, filterResponse := range a.FilterResponses {
+		if err := filterResponse.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (a *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	a.FilterResponses = make([]*FilterResponse, n)
+
+	for i := 0; i < n; i++ {
+		a.FilterResponses[i] = new(FilterResponse)
+		if err := a.FilterResponses[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *DeleteAclsResponse) key() int16 {
+	return 31
+}
+
+func (d *DeleteAclsResponse) version() int16 {
+	return 0
+}
+
+func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type FilterResponse struct {
+	Err          KError
+	ErrMsg       *string
+	MatchingAcls []*MatchingAcl
+}
+
+func (f *FilterResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(f.Err))
+	if err := pe.putNullableString(f.ErrMsg); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(f.MatchingAcls)); err != nil {
+		return err
+	}
+	for _, matchingAcl := range f.MatchingAcls {
+		if err := matchingAcl.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	f.Err = KError(kerr)
+
+	if f.ErrMsg, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	f.MatchingAcls = make([]*MatchingAcl, n)
+	for i := 0; i < n; i++ {
+		f.MatchingAcls[i] = new(MatchingAcl)
+		if err := f.MatchingAcls[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+type MatchingAcl struct {
+	Err    KError
+	ErrMsg *string
+	Resource
+	Acl
+}
+
+func (m *MatchingAcl) encode(pe packetEncoder) error {
+	pe.putInt16(int16(m.Err))
+	if err := pe.putNullableString(m.ErrMsg); err != nil {
+		return err
+	}
+
+	if err := m.Resource.encode(pe); err != nil {
+		return err
+	}
+
+	if err := m.Acl.encode(pe); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (m *MatchingAcl) decode(pd packetDecoder, version int16) (err error) {
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	m.Err = KError(kerr)
+
+	if m.ErrMsg, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	if err := m.Resource.decode(pd, version); err != nil {
+		return err
+	}
+
+	if err := m.Acl.decode(pd, version); err != nil {
+		return err
+	}
+
+	return nil
+}

+ 38 - 0
acl_delete_response_test.go

@@ -0,0 +1,38 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	deleteAclsResponse = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1,
+		0, 0, // no error
+		255, 255, // no error message
+		0, 0, 0, 1, // 1 matching acl
+		0, 0, // no error
+		255, 255, // no error message
+		2, // resource type
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		4,
+		3,
+	}
+)
+
+func TestDeleteAclsResponse(t *testing.T) {
+	resp := &DeleteAclsResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		FilterResponses: []*FilterResponse{{
+			MatchingAcls: []*MatchingAcl{{
+				Resource: Resource{ResourceType: AclResourceTopic, ResourceName: "topic"},
+				Acl:      Acl{Principal: "principal", Host: "host", Operation: AclOperationWrite, PermissionType: AclPermissionAllow},
+			}},
+		}},
+	}
+
+	testResponse(t, "", resp, deleteAclsResponse)
+}

+ 25 - 0
acl_describe_request.go

@@ -0,0 +1,25 @@
+package sarama
+
+type DescribeAclsRequest struct {
+	AclFilter
+}
+
+func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
+	return d.AclFilter.encode(pe)
+}
+
+func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	return d.AclFilter.decode(pd, version)
+}
+
+func (d *DescribeAclsRequest) key() int16 {
+	return 29
+}
+
+func (d *DescribeAclsRequest) version() int16 {
+	return 0
+}
+
+func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 35 - 0
acl_describe_request_test.go

@@ -0,0 +1,35 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	aclDescribeRequest = []byte{
+		2, // resource type
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		5, // acl operation
+		3, // acl permission type
+	}
+)
+
+func TestAclDescribeRequest(t *testing.T) {
+	resourcename := "topic"
+	principal := "principal"
+	host := "host"
+
+	req := &DescribeAclsRequest{
+		AclFilter{
+			ResourceType:   AclResourceTopic,
+			ResourceName:   &resourcename,
+			Principal:      &principal,
+			Host:           &host,
+			Operation:      AclOperationCreate,
+			PermissionType: AclPermissionAllow,
+		},
+	}
+
+	testRequest(t, "", req, aclDescribeRequest)
+}

+ 80 - 0
acl_describe_response.go

@@ -0,0 +1,80 @@
+package sarama
+
+import "time"
+
+type DescribeAclsResponse struct {
+	ThrottleTime time.Duration
+	Err          KError
+	ErrMsg       *string
+	ResourceAcls []*ResourceAcls
+}
+
+func (d *DescribeAclsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
+	pe.putInt16(int16(d.Err))
+
+	if err := pe.putNullableString(d.ErrMsg); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(d.ResourceAcls)); err != nil {
+		return err
+	}
+
+	for _, resourceAcl := range d.ResourceAcls {
+		if err := resourceAcl.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *DescribeAclsResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	d.Err = KError(kerr)
+
+	errmsg, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	if errmsg != "" {
+		d.ErrMsg = &errmsg
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	d.ResourceAcls = make([]*ResourceAcls, n)
+
+	for i := 0; i < n; i++ {
+		d.ResourceAcls[i] = new(ResourceAcls)
+		if err := d.ResourceAcls[i].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *DescribeAclsResponse) key() int16 {
+	return 29
+}
+
+func (d *DescribeAclsResponse) version() int16 {
+	return 0
+}
+
+func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 45 - 0
acl_describe_response_test.go

@@ -0,0 +1,45 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var aclDescribeResponseError = []byte{
+	0, 0, 0, 100,
+	0, 8, // error
+	0, 5, 'e', 'r', 'r', 'o', 'r',
+	0, 0, 0, 1, // 1 resource
+	2, // cluster type
+	0, 5, 't', 'o', 'p', 'i', 'c',
+	0, 0, 0, 1, // 1 acl
+	0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+	0, 4, 'h', 'o', 's', 't',
+	4, // write
+	3, // allow
+}
+
+func TestAclDescribeResponse(t *testing.T) {
+	errmsg := "error"
+	resp := &DescribeAclsResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		Err:          ErrBrokerNotAvailable,
+		ErrMsg:       &errmsg,
+		ResourceAcls: []*ResourceAcls{{
+			Resource: Resource{
+				ResourceName: "topic",
+				ResourceType: AclResourceTopic,
+			},
+			Acls: []*Acl{
+				{
+					Principal:      "principal",
+					Host:           "host",
+					Operation:      AclOperationWrite,
+					PermissionType: AclPermissionAllow,
+				},
+			},
+		}},
+	}
+
+	testResponse(t, "describe", resp, aclDescribeResponseError)
+}

+ 61 - 0
acl_filter.go

@@ -0,0 +1,61 @@
+package sarama
+
+type AclFilter struct {
+	ResourceType   AclResourceType
+	ResourceName   *string
+	Principal      *string
+	Host           *string
+	Operation      AclOperation
+	PermissionType AclPermissionType
+}
+
+func (a *AclFilter) encode(pe packetEncoder) error {
+	pe.putInt8(int8(a.ResourceType))
+	if err := pe.putNullableString(a.ResourceName); err != nil {
+		return err
+	}
+	if err := pe.putNullableString(a.Principal); err != nil {
+		return err
+	}
+	if err := pe.putNullableString(a.Host); err != nil {
+		return err
+	}
+	pe.putInt8(int8(a.Operation))
+	pe.putInt8(int8(a.PermissionType))
+
+	return nil
+}
+
+func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {
+	resourceType, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	a.ResourceType = AclResourceType(resourceType)
+
+	if a.ResourceName, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	if a.Principal, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	if a.Host, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	operation, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	a.Operation = AclOperation(operation)
+
+	permissionType, err := pd.getInt8()
+	if err != nil {
+		return err
+	}
+	a.PermissionType = AclPermissionType(permissionType)
+
+	return nil
+}

+ 42 - 0
acl_types.go

@@ -0,0 +1,42 @@
+package sarama
+
+type AclOperation int
+
+// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
+const (
+	AclOperationUnknown         AclOperation = 0
+	AclOperationAny             AclOperation = 1
+	AclOperationAll             AclOperation = 2
+	AclOperationRead            AclOperation = 3
+	AclOperationWrite           AclOperation = 4
+	AclOperationCreate          AclOperation = 5
+	AclOperationDelete          AclOperation = 6
+	AclOperationAlter           AclOperation = 7
+	AclOperationDescribe        AclOperation = 8
+	AclOperationClusterAction   AclOperation = 9
+	AclOperationDescribeConfigs AclOperation = 10
+	AclOperationAlterConfigs    AclOperation = 11
+	AclOperationIdempotentWrite AclOperation = 12
+)
+
+type AclPermissionType int
+
+// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
+const (
+	AclPermissionUnknown AclPermissionType = 0
+	AclPermissionAny     AclPermissionType = 1
+	AclPermissionDeny    AclPermissionType = 2
+	AclPermissionAllow   AclPermissionType = 3
+)
+
+type AclResourceType int
+
+// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+const (
+	AclResourceUnknown         AclResourceType = 0
+	AclResourceAny             AclResourceType = 1
+	AclResourceTopic           AclResourceType = 2
+	AclResourceGroup           AclResourceType = 3
+	AclResourceCluster         AclResourceType = 4
+	AclResourceTransactionalID AclResourceType = 5
+)

+ 33 - 0
broker.go

@@ -395,6 +395,39 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon
 	return response, nil
 }
 
+func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
+	response := new(DescribeAclsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
+	response := new(CreateAclsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
+	response := new(DeleteAclsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 6 - 0
request.go

@@ -118,6 +118,12 @@ func allocateBody(key, version int16) protocolBody {
 		return &CreateTopicsRequest{}
 	case 20:
 		return &DeleteTopicsRequest{}
+	case 29:
+		return &DescribeAclsRequest{}
+	case 30:
+		return &CreateAclsRequest{}
+	case 31:
+		return &DeleteAclsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
 	}

+ 2 - 0
request_test.go

@@ -96,3 +96,5 @@ func testResponse(t *testing.T, name string, res protocolBody, expected []byte)
 		t.Errorf("Decoded response does not match the encoded one\nencoded: %#v\ndecoded: %#v", res, decoded)
 	}
 }
+
+func nullString(s string) *string { return &s }