浏览代码

Satisfy the error interface in create responses

Contributes-to: Shopify/sarama#1153

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 7 年之前
父节点
当前提交
ec2cbc3814
共有 7 个文件被更改,包括 154 次插入5 次删除
  1. 2 2
      admin.go
  2. 60 0
      admin_test.go
  3. 12 1
      create_partitions_response.go
  4. 24 0
      create_partitions_response_test.go
  5. 12 1
      create_topics_response.go
  6. 24 0
      create_topics_response_test.go
  7. 20 1
      mockresponses.go

+ 2 - 2
admin.go

@@ -166,7 +166,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 	}
 	}
 
 
 	if topicErr.Err != ErrNoError {
 	if topicErr.Err != ErrNoError {
-		return topicErr.Err
+		return topicErr
 	}
 	}
 
 
 	return nil
 	return nil
@@ -354,7 +354,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
 	}
 	}
 
 
 	if topicErr.Err != ErrNoError {
 	if topicErr.Err != ErrNoError {
-		return topicErr.Err
+		return topicErr
 	}
 	}
 
 
 	return nil
 	return nil

+ 60 - 0
admin_test.go

@@ -2,6 +2,7 @@ package sarama
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"strings"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -105,6 +106,36 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_11_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
+	want := "insufficient permissions to create topic with reserved prefix"
+	if !strings.HasSuffix(err.Error(), want) {
+		t.Fatal(err)
+	}
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
 func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()
 	defer seedBroker.Close()
@@ -297,6 +328,35 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreatePartitions("_internal_topic", 3, nil, false)
+	want := "insufficient permissions to create partition on topic with reserved prefix"
+	if !strings.HasSuffix(err.Error(), want) {
+		t.Fatal(err)
+	}
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminDeleteRecords(t *testing.T) {
 func TestClusterAdminDeleteRecords(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()
 	defer seedBroker.Close()

+ 12 - 1
create_partitions_response.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
 
 type CreatePartitionsResponse struct {
 type CreatePartitionsResponse struct {
 	ThrottleTime         time.Duration
 	ThrottleTime         time.Duration
@@ -69,6 +72,14 @@ type TopicPartitionError struct {
 	ErrMsg *string
 	ErrMsg *string
 }
 }
 
 
+func (t *TopicPartitionError) Error() string {
+	text := t.Err.Error()
+	if t.ErrMsg != nil {
+		text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
+	}
+	return text
+}
+
 func (t *TopicPartitionError) encode(pe packetEncoder) error {
 func (t *TopicPartitionError) encode(pe packetEncoder) error {
 	pe.putInt16(int16(t.Err))
 	pe.putInt16(int16(t.Err))
 
 

+ 24 - 0
create_partitions_response_test.go

@@ -50,3 +50,27 @@ func TestCreatePartitionsResponse(t *testing.T) {
 		t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
 		t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
 	}
 	}
 }
 }
+
+func TestTopicPartitionError(t *testing.T) {
+	// Assert that TopicPartitionError satisfies error interface
+	var err error = &TopicPartitionError{
+		Err: ErrTopicAuthorizationFailed,
+	}
+
+	got := err.Error()
+	want := ErrTopicAuthorizationFailed.Error()
+	if got != want {
+		t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
+	}
+
+	msg := "reason why topic authorization failed"
+	err = &TopicPartitionError{
+		Err:    ErrTopicAuthorizationFailed,
+		ErrMsg: &msg,
+	}
+	got = err.Error()
+	want = ErrTopicAuthorizationFailed.Error() + " - " + msg
+	if got != want {
+		t.Errorf("TopicPartitionError.Error() = %v; want %v", got, want)
+	}
+}

+ 12 - 1
create_topics_response.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
 
 type CreateTopicsResponse struct {
 type CreateTopicsResponse struct {
 	Version      int16
 	Version      int16
@@ -83,6 +86,14 @@ type TopicError struct {
 	ErrMsg *string
 	ErrMsg *string
 }
 }
 
 
+func (t *TopicError) Error() string {
+	text := t.Err.Error()
+	if t.ErrMsg != nil {
+		text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
+	}
+	return text
+}
+
 func (t *TopicError) encode(pe packetEncoder, version int16) error {
 func (t *TopicError) encode(pe packetEncoder, version int16) error {
 	pe.putInt16(int16(t.Err))
 	pe.putInt16(int16(t.Err))
 
 

+ 24 - 0
create_topics_response_test.go

@@ -50,3 +50,27 @@ func TestCreateTopicsResponse(t *testing.T) {
 
 
 	testResponse(t, "version 2", resp, createTopicsResponseV2)
 	testResponse(t, "version 2", resp, createTopicsResponseV2)
 }
 }
+
+func TestTopicError(t *testing.T) {
+	// Assert that TopicError satisfies error interface
+	var err error = &TopicError{
+		Err: ErrTopicAuthorizationFailed,
+	}
+
+	got := err.Error()
+	want := ErrTopicAuthorizationFailed.Error()
+	if got != want {
+		t.Errorf("TopicError.Error() = %v; want %v", got, want)
+	}
+
+	msg := "reason why topic authorization failed"
+	err = &TopicError{
+		Err:    ErrTopicAuthorizationFailed,
+		ErrMsg: &msg,
+	}
+	got = err.Error()
+	want = ErrTopicAuthorizationFailed.Error() + " - " + msg
+	if got != want {
+		t.Errorf("TopicError.Error() = %v; want %v", got, want)
+	}
+}

+ 20 - 1
mockresponses.go

@@ -2,6 +2,7 @@ package sarama
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"strings"
 )
 )
 
 
 // TestReporter has methods matching go's testing.T to avoid importing
 // TestReporter has methods matching go's testing.T to avoid importing
@@ -612,10 +613,20 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
 
 
 func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
 func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*CreateTopicsRequest)
 	req := reqBody.(*CreateTopicsRequest)
-	res := &CreateTopicsResponse{}
+	res := &CreateTopicsResponse{
+		Version: req.Version,
+	}
 	res.TopicErrors = make(map[string]*TopicError)
 	res.TopicErrors = make(map[string]*TopicError)
 
 
 	for topic, _ := range req.TopicDetails {
 	for topic, _ := range req.TopicDetails {
+		if res.Version >= 1 && strings.HasPrefix(topic, "_") {
+			msg := "insufficient permissions to create topic with reserved prefix"
+			res.TopicErrors[topic] = &TopicError{
+				Err:    ErrTopicAuthorizationFailed,
+				ErrMsg: &msg,
+			}
+			continue
+		}
 		res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
 		res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
 	}
 	}
 	return res
 	return res
@@ -654,6 +665,14 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
 	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
 	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
 
 
 	for topic, _ := range req.TopicPartitions {
 	for topic, _ := range req.TopicPartitions {
+		if strings.HasPrefix(topic, "_") {
+			msg := "insufficient permissions to create partition on topic with reserved prefix"
+			res.TopicPartitionErrors[topic] = &TopicPartitionError{
+				Err:    ErrTopicAuthorizationFailed,
+				ErrMsg: &msg,
+			}
+			continue
+		}
 		res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
 		res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
 	}
 	}
 	return res
 	return res