123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- package sarama
- import "errors"
- type ClusterAdmin interface {
-
-
-
-
- CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
-
-
-
-
-
-
- DeleteTopic(topic string) error
-
-
-
-
-
-
- CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
-
-
- DeleteRecords(topic string, partitionOffsets map[int32]int64) error
-
-
-
-
-
-
-
- DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
-
-
-
-
-
- AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
-
-
-
-
- CreateACL(resource Resource, acl Acl) error
-
-
-
- ListAcls(filter AclFilter) ([]ResourceAcls, error)
-
-
-
- DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
-
- Close() error
- }
- type clusterAdmin struct {
- client Client
- conf *Config
- }
- func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
- client, err := NewClient(addrs, conf)
- if err != nil {
- return nil, err
- }
-
- _, err = client.Controller()
- if err != nil {
- return nil, err
- }
- ca := &clusterAdmin{
- client: client,
- conf: client.Config(),
- }
- return ca, nil
- }
- func (ca *clusterAdmin) Close() error {
- return ca.client.Close()
- }
- func (ca *clusterAdmin) Controller() (*Broker, error) {
- return ca.client.Controller()
- }
- func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- if detail == nil {
- return errors.New("You must specify topic details")
- }
- topicDetails := make(map[string]*TopicDetail)
- topicDetails[topic] = detail
- request := &CreateTopicsRequest{
- TopicDetails: topicDetails,
- ValidateOnly: validateOnly,
- Timeout: ca.conf.Admin.Timeout,
- }
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
- request.Version = 1
- }
- if ca.conf.Version.IsAtLeast(V1_0_0_0) {
- request.Version = 2
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.CreateTopics(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if topicErr.Err != ErrNoError {
- return topicErr.Err
- }
- return nil
- }
- func (ca *clusterAdmin) DeleteTopic(topic string) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- request := &DeleteTopicsRequest{
- Topics: []string{topic},
- Timeout: ca.conf.Admin.Timeout,
- }
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
- request.Version = 1
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.DeleteTopics(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicErrorCodes[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if topicErr != ErrNoError {
- return topicErr
- }
- return nil
- }
- func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- topicPartitions := make(map[string]*TopicPartition)
- topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
- request := &CreatePartitionsRequest{
- TopicPartitions: topicPartitions,
- Timeout: ca.conf.Admin.Timeout,
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.CreatePartitions(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicPartitionErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if topicErr.Err != ErrNoError {
- return topicErr.Err
- }
- return nil
- }
- func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- topics := make(map[string]*DeleteRecordsRequestTopic)
- topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
- request := &DeleteRecordsRequest{
- Topics: topics,
- Timeout: ca.conf.Admin.Timeout,
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.DeleteRecords(request)
- if err != nil {
- return err
- }
- _, ok := rsp.Topics[topic]
- if !ok {
- return ErrIncompleteResponse
- }
-
-
- return nil
- }
- func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
- var entries []ConfigEntry
- var resources []*ConfigResource
- resources = append(resources, &resource)
- request := &DescribeConfigsRequest{
- Resources: resources,
- }
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DescribeConfigs(request)
- if err != nil {
- return nil, err
- }
- for _, rspResource := range rsp.Resources {
- if rspResource.Name == resource.Name {
- if rspResource.ErrorMsg != "" {
- return nil, errors.New(rspResource.ErrorMsg)
- }
- for _, cfgEntry := range rspResource.Configs {
- entries = append(entries, *cfgEntry)
- }
- }
- }
- return entries, nil
- }
- func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
- var resources []*AlterConfigsResource
- resources = append(resources, &AlterConfigsResource{
- Type: resourceType,
- Name: name,
- ConfigEntries: entries,
- })
- request := &AlterConfigsRequest{
- Resources: resources,
- ValidateOnly: validateOnly,
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.AlterConfigs(request)
- if err != nil {
- return err
- }
- for _, rspResource := range rsp.Resources {
- if rspResource.Name == name {
- if rspResource.ErrorMsg != "" {
- return errors.New(rspResource.ErrorMsg)
- }
- }
- }
- return nil
- }
- func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
- var acls []*AclCreation
- acls = append(acls, &AclCreation{resource, acl})
- request := &CreateAclsRequest{AclCreations: acls}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- _, err = b.CreateAcls(request)
- return err
- }
- func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
- request := &DescribeAclsRequest{AclFilter: filter}
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DescribeAcls(request)
- if err != nil {
- return nil, err
- }
- var lAcls []ResourceAcls
- for _, rAcl := range rsp.ResourceAcls {
- lAcls = append(lAcls, *rAcl)
- }
- return lAcls, nil
- }
- func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
- var filters []*AclFilter
- filters = append(filters, &filter)
- request := &DeleteAclsRequest{Filters: filters}
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DeleteAcls(request)
- if err != nil {
- return nil, err
- }
- var mAcls []MatchingAcl
- for _, fr := range rsp.FilterResponses {
- for _, mACL := range fr.MatchingAcls {
- mAcls = append(mAcls, *mACL)
- }
- }
- return mAcls, nil
- }
|