소스 검색

Merge pull request #1055 from chandradeepak/master

ClusterAdmin Support
Evan Huus 7 년 전
부모
커밋
46cf3e2cf1
4개의 변경된 파일1069개의 추가작업 그리고 6개의 파일을 삭제
  1. 375 0
      admin.go
  2. 501 0
      admin_test.go
  3. 6 6
      broker.go
  4. 187 0
      mockresponses.go

+ 375 - 0
admin.go

@@ -0,0 +1,375 @@
+package sarama
+
+import "errors"
+
+// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
+// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
+// Methods with stricter requirements will specify the minimum broker version required.
+// You MUST call Close() on a client to avoid leaks
+type ClusterAdmin interface {
+	// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
+	// It may take several seconds after CreateTopic returns success for all the brokers
+	// to become aware that the topic has been created. During this time, listTopics
+	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
+	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
+
+	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
+	// and for all the brokers to become aware that the topics are gone.
+	// During this time, listTopics  may continue to return information about the deleted topic.
+	// If delete.topic.enable is false on the brokers, deleteTopic will mark
+	// the topic for deletion, but not actually delete them.
+	// This operation is supported by brokers with version 0.10.1.0 or higher.
+	DeleteTopic(topic string) error
+
+	// Increase the number of partitions of the topics  according to the corresponding values.
+	// If partitions are increased for a topic that has a key, the partition logic or ordering of
+	// the messages will be affected. It may take several seconds after this method returns
+	// success for all the brokers to become aware that the partitions have been created.
+	// During this time, ClusterAdmin#describeTopics may not return information about the
+	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
+	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
+
+	// Delete records whose offset is smaller than the given offset of the corresponding partition.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
+
+	// Get the configuration for the specified resources.
+	// The returned configuration includes default values and the Default is true
+	// can be used to distinguish them from user supplied values.
+	// Config entries where ReadOnly is true cannot be updated.
+	// The value of config entries where Sensitive is true is always nil so
+	// sensitive information is not disclosed.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
+
+	// Update the configuration for the specified resources with the default options.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	// The resources with their configs (topic is the only resource type with configs
+	// that can be updated currently Updates are not transactional so they may succeed
+	// for some resources while fail for others. The configs for a particular resource are updated automatically.
+	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
+
+	// Creates access control lists (ACLs) which are bound to specific resources.
+	// This operation is not transactional so it may succeed for some ACLs while fail for others.
+	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
+	CreateACL(resource Resource, acl Acl) error
+
+	// Lists access control lists (ACLs) according to the supplied filter.
+	// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	ListAcls(filter AclFilter) ([]ResourceAcls, error)
+
+	// Deletes access control lists (ACLs) according to the supplied filters.
+	// This operation is not transactional so it may succeed for some ACLs while fail for others.
+	// This operation is supported by brokers with version 0.11.0.0 or higher.
+	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
+
+	// Close shuts down the admin and closes underlying client.
+	Close() error
+}
+
+type clusterAdmin struct {
+	client Client
+	conf   *Config
+}
+
+// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
+func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
+	client, err := NewClient(addrs, conf)
+	if err != nil {
+		return nil, err
+	}
+
+	//make sure we can retrieve the controller
+	_, err = client.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	ca := &clusterAdmin{
+		client: client,
+		conf:   client.Config(),
+	}
+	return ca, nil
+}
+
+func (ca *clusterAdmin) Close() error {
+	return ca.client.Close()
+}
+
+func (ca *clusterAdmin) Controller() (*Broker, error) {
+	return ca.client.Controller()
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	if detail == nil {
+		return errors.New("You must specify topic details")
+	}
+
+	topicDetails := make(map[string]*TopicDetail)
+	topicDetails[topic] = detail
+
+	request := &CreateTopicsRequest{
+		TopicDetails: topicDetails,
+		ValidateOnly: validateOnly,
+	}
+
+	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 1
+	}
+	if ca.conf.Version.IsAtLeast(V1_0_0_0) {
+		request.Version = 2
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.CreateTopics(request)
+	if err != nil {
+		return err
+	}
+
+	topicErr, ok := rsp.TopicErrors[topic]
+	if !ok {
+		return ErrIncompleteResponse
+	}
+
+	if topicErr.Err != ErrNoError {
+		return topicErr.Err
+	}
+
+	return nil
+}
+
+func (ca *clusterAdmin) DeleteTopic(topic string) error {
+
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	request := &DeleteTopicsRequest{Topics: []string{topic}}
+
+	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 1
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.DeleteTopics(request)
+	if err != nil {
+		return err
+	}
+
+	topicErr, ok := rsp.TopicErrorCodes[topic]
+	if !ok {
+		return ErrIncompleteResponse
+	}
+
+	if topicErr != ErrNoError {
+		return topicErr
+	}
+	return nil
+}
+
+func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	topicPartitions := make(map[string]*TopicPartition)
+	topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
+
+	request := &CreatePartitionsRequest{
+		TopicPartitions: topicPartitions,
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.CreatePartitions(request)
+	if err != nil {
+		return err
+	}
+
+	topicErr, ok := rsp.TopicPartitionErrors[topic]
+	if !ok {
+		return ErrIncompleteResponse
+	}
+
+	if topicErr.Err != ErrNoError {
+		return topicErr.Err
+	}
+
+	return nil
+}
+
+func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
+
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	topics := make(map[string]*DeleteRecordsRequestTopic)
+	topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
+	request := &DeleteRecordsRequest{
+		Topics: topics}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.DeleteRecords(request)
+	if err != nil {
+		return err
+	}
+
+	_, ok := rsp.Topics[topic]
+	if !ok {
+		return ErrIncompleteResponse
+	}
+
+	//todo since we are dealing with couple of partitions it would be good if we return slice of errors
+	//for each partition instead of one error
+	return nil
+}
+
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+
+	var entries []ConfigEntry
+	var resources []*ConfigResource
+	resources = append(resources, &resource)
+
+	request := &DescribeConfigsRequest{
+		Resources: resources,
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeConfigs(request)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, rspResource := range rsp.Resources {
+		if rspResource.Name == resource.Name {
+			if rspResource.ErrorMsg != "" {
+				return nil, errors.New(rspResource.ErrorMsg)
+			}
+			for _, cfgEntry := range rspResource.Configs {
+				entries = append(entries, *cfgEntry)
+			}
+		}
+	}
+	return entries, nil
+}
+
+func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
+
+	var resources []*AlterConfigsResource
+	resources = append(resources, &AlterConfigsResource{
+		Type:          resourceType,
+		Name:          name,
+		ConfigEntries: entries,
+	})
+
+	request := &AlterConfigsRequest{
+		Resources:    resources,
+		ValidateOnly: validateOnly,
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	rsp, err := b.AlterConfigs(request)
+	if err != nil {
+		return err
+	}
+
+	for _, rspResource := range rsp.Resources {
+		if rspResource.Name == name {
+			if rspResource.ErrorMsg != "" {
+				return errors.New(rspResource.ErrorMsg)
+			}
+		}
+	}
+	return nil
+}
+
+func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
+	var acls []*AclCreation
+	acls = append(acls, &AclCreation{resource, acl})
+	request := &CreateAclsRequest{AclCreations: acls}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return err
+	}
+
+	_, err = b.CreateAcls(request)
+	return err
+}
+
+func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
+
+	request := &DescribeAclsRequest{AclFilter: filter}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeAcls(request)
+	if err != nil {
+		return nil, err
+	}
+
+	var lAcls []ResourceAcls
+	for _, rAcl := range rsp.ResourceAcls {
+		lAcls = append(lAcls, *rAcl)
+	}
+	return lAcls, nil
+}
+
+func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
+	var filters []*AclFilter
+	filters = append(filters, &filter)
+	request := &DeleteAclsRequest{Filters: filters}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DeleteAcls(request)
+	if err != nil {
+		return nil, err
+	}
+
+	var mAcls []MatchingAcl
+	for _, fr := range rsp.FilterResponses {
+		for _, mACL := range fr.MatchingAcls {
+			mAcls = append(mAcls, *mACL)
+		}
+
+	}
+	return mAcls, nil
+}

+ 501 - 0
admin_test.go

@@ -0,0 +1,501 @@
+package sarama
+
+import (
+	"errors"
+	"testing"
+)
+
+func TestClusterAdmin(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminInvalidController(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	_, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err == nil {
+		t.Fatal(errors.New("controller not set still cluster admin was created"))
+	}
+
+	if err != ErrControllerNotAvailable {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreateTopic(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreateTopic("my_topic", nil, false)
+	if err.Error() != "You must specify topic details" {
+		t.Fatal(err)
+	}
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_11_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
+	if err != ErrInsufficientData {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteTopic(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.DeleteTopic("my_topic")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.DeleteTopic("")
+	if err != ErrInvalidTopic {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreatePartitions(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreatePartitions("my_topic", 3, nil, false)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.CreatePartitions("my_topic", 3, nil, false)
+	if err != ErrUnsupportedVersion {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteRecords(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	partitionOffset := make(map[int32]int64)
+	partitionOffset[1] = 1000
+	partitionOffset[2] = 1000
+	partitionOffset[3] = 1000
+
+	err = admin.DeleteRecords("my_topic", partitionOffset)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V0_10_2_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	partitionOffset := make(map[int32]int64)
+	partitionOffset[1] = 1000
+	partitionOffset[2] = 1000
+	partitionOffset[3] = 1000
+
+	err = admin.DeleteRecords("my_topic", partitionOffset)
+	if err != ErrUnsupportedVersion {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDescribeConfig(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
+	entries, err := admin.DescribeConfig(resource)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(entries) <= 0 {
+		t.Fatal(errors.New("no resource present"))
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminAlterConfig(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var value string
+	entries := make(map[string]*string)
+	value = "3"
+	entries["ReplicationFactor"] = &value
+	err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminCreateAcl(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"CreateAclsRequest": NewMockCreateAclsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
+	a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
+
+	err = admin.CreateACL(r, a)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminListAcls(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DescribeAclsRequest": NewMockListAclsResponse(t),
+		"CreateAclsRequest":   NewMockCreateAclsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
+	a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
+
+	err = admin.CreateACL(r, a)
+	if err != nil {
+		t.Fatal(err)
+	}
+	resourceName := "my_topic"
+	filter := AclFilter{
+		ResourceType: AclResourceTopic,
+		Operation:    AclOperationRead,
+		ResourceName: &resourceName,
+	}
+
+	rAcls, err := admin.ListAcls(filter)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rAcls) <= 0 {
+		t.Fatal("no acls present")
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteAcl(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DeleteAclsRequest": NewMockDeleteAclsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	resourceName := "my_topic"
+	filter := AclFilter{
+		ResourceType: AclResourceTopic,
+		Operation:    AclOperationAlter,
+		ResourceName: &resourceName,
+	}
+
+	_, err = admin.DeleteACL(filter, false)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}

+ 6 - 6
broker.go

@@ -386,8 +386,8 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
 	return response, nil
 }
 
-func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
-	response := new(CreatePartitionsResponse)
+func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
+	response := new(CreateTopicsResponse)
 
 	err := b.sendAndReceive(request, response)
 	if err != nil {
@@ -397,8 +397,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart
 	return response, nil
 }
 
-func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
-	response := new(CreateTopicsResponse)
+func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
+	response := new(DeleteTopicsResponse)
 
 	err := b.sendAndReceive(request, response)
 	if err != nil {
@@ -408,8 +408,8 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon
 	return response, nil
 }
 
-func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
-	response := new(DeleteTopicsResponse)
+func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
+	response := new(CreatePartitionsResponse)
 
 	err := b.sendAndReceive(request, response)
 	if err != nil {

+ 187 - 0
mockresponses.go

@@ -538,3 +538,190 @@ func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
 	}
 	return res
 }
+
+type MockCreateTopicsResponse struct {
+	t TestReporter
+}
+
+func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
+	return &MockCreateTopicsResponse{t: t}
+}
+
+func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*CreateTopicsRequest)
+	res := &CreateTopicsResponse{}
+	res.TopicErrors = make(map[string]*TopicError)
+
+	for topic, _ := range req.TopicDetails {
+		res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
+	}
+	return res
+}
+
+type MockDeleteTopicsResponse struct {
+	t TestReporter
+}
+
+func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
+	return &MockDeleteTopicsResponse{t: t}
+}
+
+func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*DeleteTopicsRequest)
+	res := &DeleteTopicsResponse{}
+	res.TopicErrorCodes = make(map[string]KError)
+
+	for _, topic := range req.Topics {
+		res.TopicErrorCodes[topic] = ErrNoError
+	}
+	return res
+}
+
+type MockCreatePartitionsResponse struct {
+	t TestReporter
+}
+
+func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
+	return &MockCreatePartitionsResponse{t: t}
+}
+
+func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*CreatePartitionsRequest)
+	res := &CreatePartitionsResponse{}
+	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
+
+	for topic, _ := range req.TopicPartitions {
+		res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
+	}
+	return res
+}
+
+type MockDeleteRecordsResponse struct {
+	t TestReporter
+}
+
+func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
+	return &MockDeleteRecordsResponse{t: t}
+}
+
+func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*DeleteRecordsRequest)
+	res := &DeleteRecordsResponse{}
+	res.Topics = make(map[string]*DeleteRecordsResponseTopic)
+
+	for topic, deleteRecordRequestTopic := range req.Topics {
+		partitions := make(map[int32]*DeleteRecordsResponsePartition)
+		for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
+			partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
+		}
+		res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
+	}
+	return res
+}
+
+type MockDescribeConfigsResponse struct {
+	t TestReporter
+}
+
+func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
+	return &MockDescribeConfigsResponse{t: t}
+}
+
+func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*DescribeConfigsRequest)
+	res := &DescribeConfigsResponse{}
+
+	var configEntries []*ConfigEntry
+	configEntries = append(configEntries, &ConfigEntry{Name: "my_topic",
+		Value:     "my_topic",
+		ReadOnly:  true,
+		Default:   true,
+		Sensitive: false,
+	})
+
+	for _, r := range req.Resources {
+		res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries})
+	}
+	return res
+}
+
+type MockAlterConfigsResponse struct {
+	t TestReporter
+}
+
+func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
+	return &MockAlterConfigsResponse{t: t}
+}
+
+func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*AlterConfigsRequest)
+	res := &AlterConfigsResponse{}
+
+	for _, r := range req.Resources {
+		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
+			Type:     TopicResource,
+			ErrorMsg: "",
+		})
+	}
+	return res
+}
+
+type MockCreateAclsResponse struct {
+	t TestReporter
+}
+
+func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
+	return &MockCreateAclsResponse{t: t}
+}
+
+func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*CreateAclsRequest)
+	res := &CreateAclsResponse{}
+
+	for range req.AclCreations {
+		res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
+	}
+	return res
+}
+
+type MockListAclsResponse struct {
+	t TestReporter
+}
+
+func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
+	return &MockListAclsResponse{t: t}
+}
+
+func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*DescribeAclsRequest)
+	res := &DescribeAclsResponse{}
+
+	res.Err = ErrNoError
+	acl := &ResourceAcls{}
+	acl.Resource.ResourceName = *req.ResourceName
+	acl.Resource.ResourceType = req.ResourceType
+	acl.Acls = append(acl.Acls, &Acl{})
+	res.ResourceAcls = append(res.ResourceAcls, acl)
+
+	return res
+}
+
+type MockDeleteAclsResponse struct {
+	t TestReporter
+}
+
+func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
+	return &MockDeleteAclsResponse{t: t}
+}
+
+func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
+	req := reqBody.(*DeleteAclsRequest)
+	res := &DeleteAclsResponse{}
+
+	for range req.Filters {
+		response := &FilterResponse{Err: ErrNoError}
+		response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
+		res.FilterResponses = append(res.FilterResponses, response)
+	}
+	return res
+}