Quellcode durchsuchen

Implements v1 of {Create,Describe,Delete}AclRequest

Conor Mongey vor 5 Jahren
Ursprung
Commit
66abce6c25

+ 21 - 5
acl_bindings.go

@@ -1,17 +1,26 @@
 package sarama
 
 type Resource struct {
-	ResourceType AclResourceType
-	ResourceName string
+	ResourceType       AclResourceType
+	ResourceName       string
+	ResoucePatternType AclResourcePatternType
 }
 
-func (r *Resource) encode(pe packetEncoder) error {
+func (r *Resource) encode(pe packetEncoder, version int16) error {
 	pe.putInt8(int8(r.ResourceType))
 
 	if err := pe.putString(r.ResourceName); err != nil {
 		return err
 	}
 
+	if version == 1 {
+		if r.ResoucePatternType == AclPatternUnknown {
+			Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
+			r.ResoucePatternType = AclPatternLiteral
+		}
+		pe.putInt8(int8(r.ResoucePatternType))
+	}
+
 	return nil
 }
 
@@ -25,6 +34,13 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
 	if r.ResourceName, err = pd.getString(); err != nil {
 		return err
 	}
+	if version == 1 {
+		pattern, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		r.ResoucePatternType = AclResourcePatternType(pattern)
+	}
 
 	return nil
 }
@@ -80,8 +96,8 @@ type ResourceAcls struct {
 	Acls []*Acl
 }
 
-func (r *ResourceAcls) encode(pe packetEncoder) error {
-	if err := r.Resource.encode(pe); err != nil {
+func (r *ResourceAcls) encode(pe packetEncoder, version int16) error {
+	if err := r.Resource.encode(pe, version); err != nil {
 		return err
 	}
 

+ 12 - 5
acl_create_request.go

@@ -1,6 +1,7 @@
 package sarama
 
 type CreateAclsRequest struct {
+	Version      int16
 	AclCreations []*AclCreation
 }
 
@@ -10,7 +11,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error {
 	}
 
 	for _, aclCreation := range c.AclCreations {
-		if err := aclCreation.encode(pe); err != nil {
+		if err := aclCreation.encode(pe, c.Version); err != nil {
 			return err
 		}
 	}
@@ -19,6 +20,7 @@ func (c *CreateAclsRequest) encode(pe packetEncoder) error {
 }
 
 func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	c.Version = version
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -41,11 +43,16 @@ func (d *CreateAclsRequest) key() int16 {
 }
 
 func (d *CreateAclsRequest) version() int16 {
-	return 0
+	return d.Version
 }
 
 func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch d.Version {
+	case 1:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }
 
 type AclCreation struct {
@@ -53,8 +60,8 @@ type AclCreation struct {
 	Acl
 }
 
-func (a *AclCreation) encode(pe packetEncoder) error {
-	if err := a.Resource.encode(pe); err != nil {
+func (a *AclCreation) encode(pe packetEncoder, version int16) error {
+	if err := a.Resource.encode(pe, version); err != nil {
 		return err
 	}
 	if err := a.Acl.encode(pe); err != nil {

+ 33 - 1
acl_create_request_test.go

@@ -12,10 +12,21 @@ var (
 		2, // all
 		2, // deny
 	}
+	aclCreateRequestv1 = []byte{
+		0, 0, 0, 1,
+		3, // resource type = group
+		0, 5, 'g', 'r', 'o', 'u', 'p',
+		3, // resource pattten type = literal
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		2, // all
+		2, // deny
+	}
 )
 
-func TestCreateAclsRequest(t *testing.T) {
+func TestCreateAclsRequestv0(t *testing.T) {
 	req := &CreateAclsRequest{
+		Version: 0,
 		AclCreations: []*AclCreation{{
 			Resource: Resource{
 				ResourceType: AclResourceGroup,
@@ -32,3 +43,24 @@ func TestCreateAclsRequest(t *testing.T) {
 
 	testRequest(t, "create request", req, aclCreateRequest)
 }
+
+func TestCreateAclsRequestv1(t *testing.T) {
+	req := &CreateAclsRequest{
+		Version: 1,
+		AclCreations: []*AclCreation{{
+			Resource: Resource{
+				ResourceType:       AclResourceGroup,
+				ResourceName:       "group",
+				ResoucePatternType: AclPatternLiteral,
+			},
+			Acl: Acl{
+				Principal:      "principal",
+				Host:           "host",
+				Operation:      AclOperationAll,
+				PermissionType: AclPermissionDeny,
+			}},
+		},
+	}
+
+	testRequest(t, "create request v1", req, aclCreateRequestv1)
+}

+ 11 - 2
acl_delete_request.go

@@ -1,6 +1,7 @@
 package sarama
 
 type DeleteAclsRequest struct {
+	Version int
 	Filters []*AclFilter
 }
 
@@ -10,6 +11,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
 	}
 
 	for _, filter := range d.Filters {
+		filter.Version = d.Version
 		if err := filter.encode(pe); err != nil {
 			return err
 		}
@@ -19,6 +21,7 @@ func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
 }
 
 func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	d.Version = int(version)
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -27,6 +30,7 @@ func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error)
 	d.Filters = make([]*AclFilter, n)
 	for i := 0; i < n; i++ {
 		d.Filters[i] = new(AclFilter)
+		d.Filters[i].Version = int(version)
 		if err := d.Filters[i].decode(pd, version); err != nil {
 			return err
 		}
@@ -40,9 +44,14 @@ func (d *DeleteAclsRequest) key() int16 {
 }
 
 func (d *DeleteAclsRequest) version() int16 {
-	return 0
+	return int16(d.Version)
 }
 
 func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch d.Version {
+	case 1:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }

+ 43 - 0
acl_delete_request_test.go

@@ -3,6 +3,28 @@ package sarama
 import "testing"
 
 var (
+	aclDeleteRequestNullsv1 = []byte{
+		0, 0, 0, 1,
+		1,
+		255, 255,
+		1, // Any
+		255, 255,
+		255, 255,
+		11,
+		3,
+	}
+
+	aclDeleteRequestv1 = []byte{
+		0, 0, 0, 1,
+		1, // any
+		0, 6, 'f', 'i', 'l', 't', 'e', 'r',
+		1, // Any Filter
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		4, // write
+		3, // allow
+	}
+
 	aclDeleteRequestNulls = []byte{
 		0, 0, 0, 1,
 		1,
@@ -67,3 +89,24 @@ func TestDeleteAclsRequest(t *testing.T) {
 
 	testRequest(t, "delete request array", req, aclDeleteRequestArray)
 }
+
+func TestDeleteAclsRequestV1(t *testing.T) {
+	req := &DeleteAclsRequest{
+		Version: 1,
+		Filters: []*AclFilter{{
+			ResourceType:              AclResourceAny,
+			Operation:                 AclOperationAlterConfigs,
+			PermissionType:            AclPermissionAllow,
+			ResourcePatternTypeFilter: AclPatternAny,
+		}},
+	}
+
+	testRequest(t, "delete request nulls", req, aclDeleteRequestNullsv1)
+
+	req.Filters[0].ResourceName = nullString("filter")
+	req.Filters[0].Principal = nullString("principal")
+	req.Filters[0].Host = nullString("host")
+	req.Filters[0].Operation = AclOperationWrite
+
+	testRequest(t, "delete request", req, aclDeleteRequestv1)
+}

+ 7 - 6
acl_delete_response.go

@@ -3,6 +3,7 @@ package sarama
 import "time"
 
 type DeleteAclsResponse struct {
+	Version         int16
 	ThrottleTime    time.Duration
 	FilterResponses []*FilterResponse
 }
@@ -15,7 +16,7 @@ func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
 	}
 
 	for _, filterResponse := range a.FilterResponses {
-		if err := filterResponse.encode(pe); err != nil {
+		if err := filterResponse.encode(pe, a.Version); err != nil {
 			return err
 		}
 	}
@@ -51,7 +52,7 @@ func (d *DeleteAclsResponse) key() int16 {
 }
 
 func (d *DeleteAclsResponse) version() int16 {
-	return 0
+	return int16(d.Version)
 }
 
 func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
@@ -64,7 +65,7 @@ type FilterResponse struct {
 	MatchingAcls []*MatchingAcl
 }
 
-func (f *FilterResponse) encode(pe packetEncoder) error {
+func (f *FilterResponse) encode(pe packetEncoder, version int16) error {
 	pe.putInt16(int16(f.Err))
 	if err := pe.putNullableString(f.ErrMsg); err != nil {
 		return err
@@ -74,7 +75,7 @@ func (f *FilterResponse) encode(pe packetEncoder) error {
 		return err
 	}
 	for _, matchingAcl := range f.MatchingAcls {
-		if err := matchingAcl.encode(pe); err != nil {
+		if err := matchingAcl.encode(pe, version); err != nil {
 			return err
 		}
 	}
@@ -115,13 +116,13 @@ type MatchingAcl struct {
 	Acl
 }
 
-func (m *MatchingAcl) encode(pe packetEncoder) error {
+func (m *MatchingAcl) encode(pe packetEncoder, version int16) error {
 	pe.putInt16(int16(m.Err))
 	if err := pe.putNullableString(m.ErrMsg); err != nil {
 		return err
 	}
 
-	if err := m.Resource.encode(pe); err != nil {
+	if err := m.Resource.encode(pe, version); err != nil {
 		return err
 	}
 

+ 11 - 2
acl_describe_request.go

@@ -1,14 +1,18 @@
 package sarama
 
 type DescribeAclsRequest struct {
+	Version int
 	AclFilter
 }
 
 func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
+	d.AclFilter.Version = d.Version
 	return d.AclFilter.encode(pe)
 }
 
 func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+	d.Version = int(version)
+	d.AclFilter.Version = int(version)
 	return d.AclFilter.decode(pd, version)
 }
 
@@ -17,9 +21,14 @@ func (d *DescribeAclsRequest) key() int16 {
 }
 
 func (d *DescribeAclsRequest) version() int16 {
-	return 0
+	return int16(d.Version)
 }
 
 func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch d.Version {
+	case 1:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }

+ 32 - 2
acl_describe_request_test.go

@@ -13,15 +13,24 @@ var (
 		5, // acl operation
 		3, // acl permission type
 	}
+	aclDescribeRequestV1 = []byte{
+		2, // resource type
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		1, // any Type
+		0, 9, 'p', 'r', 'i', 'n', 'c', 'i', 'p', 'a', 'l',
+		0, 4, 'h', 'o', 's', 't',
+		5, // acl operation
+		3, // acl permission type
+	}
 )
 
-func TestAclDescribeRequest(t *testing.T) {
+func TestAclDescribeRequestV0(t *testing.T) {
 	resourcename := "topic"
 	principal := "principal"
 	host := "host"
 
 	req := &DescribeAclsRequest{
-		AclFilter{
+		AclFilter: AclFilter{
 			ResourceType:   AclResourceTopic,
 			ResourceName:   &resourcename,
 			Principal:      &principal,
@@ -33,3 +42,24 @@ func TestAclDescribeRequest(t *testing.T) {
 
 	testRequest(t, "", req, aclDescribeRequest)
 }
+
+func TestAclDescribeRequestV1(t *testing.T) {
+	resourcename := "topic"
+	principal := "principal"
+	host := "host"
+
+	req := &DescribeAclsRequest{
+		Version: 1,
+		AclFilter: AclFilter{
+			ResourceType:              AclResourceTopic,
+			ResourceName:              &resourcename,
+			ResourcePatternTypeFilter: AclPatternAny,
+			Principal:                 &principal,
+			Host:                      &host,
+			Operation:                 AclOperationCreate,
+			PermissionType:            AclPermissionAllow,
+		},
+	}
+
+	testRequest(t, "", req, aclDescribeRequestV1)
+}

+ 9 - 3
acl_describe_response.go

@@ -3,6 +3,7 @@ package sarama
 import "time"
 
 type DescribeAclsResponse struct {
+	Version      int16
 	ThrottleTime time.Duration
 	Err          KError
 	ErrMsg       *string
@@ -22,7 +23,7 @@ func (d *DescribeAclsResponse) encode(pe packetEncoder) error {
 	}
 
 	for _, resourceAcl := range d.ResourceAcls {
-		if err := resourceAcl.encode(pe); err != nil {
+		if err := resourceAcl.encode(pe, d.Version); err != nil {
 			return err
 		}
 	}
@@ -72,9 +73,14 @@ func (d *DescribeAclsResponse) key() int16 {
 }
 
 func (d *DescribeAclsResponse) version() int16 {
-	return 0
+	return int16(d.Version)
 }
 
 func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch d.Version {
+	case 1:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }

+ 23 - 6
acl_filter.go

@@ -1,12 +1,14 @@
 package sarama
 
 type AclFilter struct {
-	ResourceType   AclResourceType
-	ResourceName   *string
-	Principal      *string
-	Host           *string
-	Operation      AclOperation
-	PermissionType AclPermissionType
+	Version                   int
+	ResourceType              AclResourceType
+	ResourceName              *string
+	ResourcePatternTypeFilter AclResourcePatternType
+	Principal                 *string
+	Host                      *string
+	Operation                 AclOperation
+	PermissionType            AclPermissionType
 }
 
 func (a *AclFilter) encode(pe packetEncoder) error {
@@ -14,6 +16,11 @@ func (a *AclFilter) encode(pe packetEncoder) error {
 	if err := pe.putNullableString(a.ResourceName); err != nil {
 		return err
 	}
+
+	if a.Version == 1 {
+		pe.putInt8(int8(a.ResourcePatternTypeFilter))
+	}
+
 	if err := pe.putNullableString(a.Principal); err != nil {
 		return err
 	}
@@ -37,6 +44,16 @@ func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {
 		return err
 	}
 
+	if a.Version == 1 {
+		pattern, err := pd.getInt8()
+
+		if err != nil {
+			return err
+		}
+
+		a.ResourcePatternTypeFilter = AclResourcePatternType(pattern)
+	}
+
 	if a.Principal, err = pd.getNullableString(); err != nil {
 		return err
 	}

+ 12 - 0
acl_types.go

@@ -40,3 +40,15 @@ const (
 	AclResourceCluster         AclResourceType = 4
 	AclResourceTransactionalID AclResourceType = 5
 )
+
+type AclResourcePatternType int
+
+// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
+
+const (
+	AclPatternUnknown AclResourcePatternType = iota
+	AclPatternAny
+	AclPatternMatch
+	AclPatternLiteral
+	AclPatternPrefixed
+)