admin.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  11. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  12. // Methods with stricter requirements will specify the minimum broker version required.
  13. // You MUST call Close() on a client to avoid leaks
  14. type ClusterAdmin interface {
  15. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  16. // It may take several seconds after CreateTopic returns success for all the brokers
  17. // to become aware that the topic has been created. During this time, listTopics
  18. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  19. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  20. // List the topics available in the cluster with the default options.
  21. ListTopics() (map[string]TopicDetail, error)
  22. // Describe some topics in the cluster.
  23. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  24. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  25. // and for all the brokers to become aware that the topics are gone.
  26. // During this time, listTopics may continue to return information about the deleted topic.
  27. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  28. // the topic for deletion, but not actually delete them.
  29. // This operation is supported by brokers with version 0.10.1.0 or higher.
  30. DeleteTopic(topic string) error
  31. // Increase the number of partitions of the topics according to the corresponding values.
  32. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  33. // the messages will be affected. It may take several seconds after this method returns
  34. // success for all the brokers to become aware that the partitions have been created.
  35. // During this time, ClusterAdmin#describeTopics may not return information about the
  36. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  37. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  38. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  39. // This operation is supported by brokers with version 0.11.0.0 or higher.
  40. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  41. // Get the configuration for the specified resources.
  42. // The returned configuration includes default values and the Default is true
  43. // can be used to distinguish them from user supplied values.
  44. // Config entries where ReadOnly is true cannot be updated.
  45. // The value of config entries where Sensitive is true is always nil so
  46. // sensitive information is not disclosed.
  47. // This operation is supported by brokers with version 0.11.0.0 or higher.
  48. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  49. // Update the configuration for the specified resources with the default options.
  50. // This operation is supported by brokers with version 0.11.0.0 or higher.
  51. // The resources with their configs (topic is the only resource type with configs
  52. // that can be updated currently Updates are not transactional so they may succeed
  53. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  54. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  55. // Creates access control lists (ACLs) which are bound to specific resources.
  56. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  57. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  58. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  59. CreateACL(resource Resource, acl Acl) error
  60. // Lists access control lists (ACLs) according to the supplied filter.
  61. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  62. // This operation is supported by brokers with version 0.11.0.0 or higher.
  63. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  64. // Deletes access control lists (ACLs) according to the supplied filters.
  65. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  66. // This operation is supported by brokers with version 0.11.0.0 or higher.
  67. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  68. // List the consumer groups available in the cluster.
  69. ListConsumerGroups() (map[string]string, error)
  70. // Describe the given consumer groups.
  71. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  72. // List the consumer group offsets available in the cluster.
  73. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  74. // Delete a consumer group.
  75. DeleteConsumerGroup(group string) error
  76. // Get information about the nodes in the cluster
  77. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  78. // Close shuts down the admin and closes underlying client.
  79. Close() error
  80. }
  81. type clusterAdmin struct {
  82. client Client
  83. conf *Config
  84. }
  85. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  86. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  87. client, err := NewClient(addrs, conf)
  88. if err != nil {
  89. return nil, err
  90. }
  91. return NewClusterAdminFromClient(client)
  92. }
  93. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  94. // Note that underlying client will also be closed on admin's Close() call.
  95. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  96. //make sure we can retrieve the controller
  97. _, err := client.Controller()
  98. if err != nil {
  99. return nil, err
  100. }
  101. ca := &clusterAdmin{
  102. client: client,
  103. conf: client.Config(),
  104. }
  105. return ca, nil
  106. }
  107. func (ca *clusterAdmin) Close() error {
  108. return ca.client.Close()
  109. }
  110. func (ca *clusterAdmin) Controller() (*Broker, error) {
  111. return ca.client.Controller()
  112. }
  113. func (ca *clusterAdmin) refreshController() (*Broker, error) {
  114. return ca.client.RefreshController()
  115. }
  116. // isErrNoController returns `true` if the given error type unwraps to an
  117. // `ErrNotController` response from Kafka
  118. func isErrNoController(err error) bool {
  119. switch e := err.(type) {
  120. case *TopicError:
  121. return e.Err == ErrNotController
  122. case *TopicPartitionError:
  123. return e.Err == ErrNotController
  124. case KError:
  125. return e == ErrNotController
  126. }
  127. return false
  128. }
  129. // retryOnError will repeatedly call the given (error-returning) func in the
  130. // case that its response is non-nil and retriable (as determined by the
  131. // provided retriable func) up to the maximum number of tries permitted by
  132. // the admin client configuration
  133. func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
  134. var err error
  135. for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
  136. err = fn()
  137. if err == nil || !retriable(err) {
  138. return err
  139. }
  140. Logger.Printf(
  141. "admin/request retrying after %dms... (%d attempts remaining)\n",
  142. ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
  143. time.Sleep(ca.conf.Admin.Retry.Backoff)
  144. continue
  145. }
  146. return err
  147. }
  148. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  149. if topic == "" {
  150. return ErrInvalidTopic
  151. }
  152. if detail == nil {
  153. return errors.New("you must specify topic details")
  154. }
  155. topicDetails := make(map[string]*TopicDetail)
  156. topicDetails[topic] = detail
  157. request := &CreateTopicsRequest{
  158. TopicDetails: topicDetails,
  159. ValidateOnly: validateOnly,
  160. Timeout: ca.conf.Admin.Timeout,
  161. }
  162. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  163. request.Version = 1
  164. }
  165. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  166. request.Version = 2
  167. }
  168. return ca.retryOnError(isErrNoController, func() error {
  169. b, err := ca.Controller()
  170. if err != nil {
  171. return err
  172. }
  173. rsp, err := b.CreateTopics(request)
  174. if err != nil {
  175. return err
  176. }
  177. topicErr, ok := rsp.TopicErrors[topic]
  178. if !ok {
  179. return ErrIncompleteResponse
  180. }
  181. if topicErr.Err != ErrNoError {
  182. if topicErr.Err == ErrNotController {
  183. _, _ = ca.refreshController()
  184. }
  185. return topicErr
  186. }
  187. return nil
  188. })
  189. }
  190. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  191. controller, err := ca.Controller()
  192. if err != nil {
  193. return nil, err
  194. }
  195. request := &MetadataRequest{
  196. Topics: topics,
  197. AllowAutoTopicCreation: false,
  198. }
  199. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  200. request.Version = 5
  201. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  202. request.Version = 4
  203. }
  204. response, err := controller.GetMetadata(request)
  205. if err != nil {
  206. return nil, err
  207. }
  208. return response.Topics, nil
  209. }
  210. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  211. controller, err := ca.Controller()
  212. if err != nil {
  213. return nil, int32(0), err
  214. }
  215. request := &MetadataRequest{
  216. Topics: []string{},
  217. }
  218. if ca.conf.Version.IsAtLeast(V0_10_0_0) {
  219. request.Version = 1
  220. }
  221. response, err := controller.GetMetadata(request)
  222. if err != nil {
  223. return nil, int32(0), err
  224. }
  225. return response.Brokers, response.ControllerID, nil
  226. }
  227. func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
  228. brokers := ca.client.Brokers()
  229. for _, b := range brokers {
  230. if b.ID() == id {
  231. return b, nil
  232. }
  233. }
  234. return nil, fmt.Errorf("could not find broker id %d", id)
  235. }
  236. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  237. brokers := ca.client.Brokers()
  238. if len(brokers) > 0 {
  239. index := rand.Intn(len(brokers))
  240. return brokers[index], nil
  241. }
  242. return nil, errors.New("no available broker")
  243. }
  244. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  245. // In order to build TopicDetails we need to first get the list of all
  246. // topics using a MetadataRequest and then get their configs using a
  247. // DescribeConfigsRequest request. To avoid sending many requests to the
  248. // broker, we use a single DescribeConfigsRequest.
  249. // Send the all-topic MetadataRequest
  250. b, err := ca.findAnyBroker()
  251. if err != nil {
  252. return nil, err
  253. }
  254. _ = b.Open(ca.client.Config())
  255. metadataReq := &MetadataRequest{}
  256. metadataResp, err := b.GetMetadata(metadataReq)
  257. if err != nil {
  258. return nil, err
  259. }
  260. topicsDetailsMap := make(map[string]TopicDetail)
  261. var describeConfigsResources []*ConfigResource
  262. for _, topic := range metadataResp.Topics {
  263. topicDetails := TopicDetail{
  264. NumPartitions: int32(len(topic.Partitions)),
  265. }
  266. if len(topic.Partitions) > 0 {
  267. topicDetails.ReplicaAssignment = map[int32][]int32{}
  268. for _, partition := range topic.Partitions {
  269. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  270. }
  271. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  272. }
  273. topicsDetailsMap[topic.Name] = topicDetails
  274. // we populate the resources we want to describe from the MetadataResponse
  275. topicResource := ConfigResource{
  276. Type: TopicResource,
  277. Name: topic.Name,
  278. }
  279. describeConfigsResources = append(describeConfigsResources, &topicResource)
  280. }
  281. // Send the DescribeConfigsRequest
  282. describeConfigsReq := &DescribeConfigsRequest{
  283. Resources: describeConfigsResources,
  284. }
  285. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  286. describeConfigsReq.Version = 1
  287. }
  288. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  289. describeConfigsReq.Version = 2
  290. }
  291. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  292. if err != nil {
  293. return nil, err
  294. }
  295. for _, resource := range describeConfigsResp.Resources {
  296. topicDetails := topicsDetailsMap[resource.Name]
  297. topicDetails.ConfigEntries = make(map[string]*string)
  298. for _, entry := range resource.Configs {
  299. // only include non-default non-sensitive config
  300. // (don't actually think topic config will ever be sensitive)
  301. if entry.Default || entry.Sensitive {
  302. continue
  303. }
  304. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  305. }
  306. topicsDetailsMap[resource.Name] = topicDetails
  307. }
  308. return topicsDetailsMap, nil
  309. }
  310. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  311. if topic == "" {
  312. return ErrInvalidTopic
  313. }
  314. request := &DeleteTopicsRequest{
  315. Topics: []string{topic},
  316. Timeout: ca.conf.Admin.Timeout,
  317. }
  318. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  319. request.Version = 1
  320. }
  321. return ca.retryOnError(isErrNoController, func() error {
  322. b, err := ca.Controller()
  323. if err != nil {
  324. return err
  325. }
  326. rsp, err := b.DeleteTopics(request)
  327. if err != nil {
  328. return err
  329. }
  330. topicErr, ok := rsp.TopicErrorCodes[topic]
  331. if !ok {
  332. return ErrIncompleteResponse
  333. }
  334. if topicErr != ErrNoError {
  335. if topicErr == ErrNotController {
  336. _, _ = ca.refreshController()
  337. }
  338. return topicErr
  339. }
  340. return nil
  341. })
  342. }
  343. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  344. if topic == "" {
  345. return ErrInvalidTopic
  346. }
  347. topicPartitions := make(map[string]*TopicPartition)
  348. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  349. request := &CreatePartitionsRequest{
  350. TopicPartitions: topicPartitions,
  351. Timeout: ca.conf.Admin.Timeout,
  352. }
  353. return ca.retryOnError(isErrNoController, func() error {
  354. b, err := ca.Controller()
  355. if err != nil {
  356. return err
  357. }
  358. rsp, err := b.CreatePartitions(request)
  359. if err != nil {
  360. return err
  361. }
  362. topicErr, ok := rsp.TopicPartitionErrors[topic]
  363. if !ok {
  364. return ErrIncompleteResponse
  365. }
  366. if topicErr.Err != ErrNoError {
  367. if topicErr.Err == ErrNotController {
  368. _, _ = ca.refreshController()
  369. }
  370. return topicErr
  371. }
  372. return nil
  373. })
  374. }
  375. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  376. if topic == "" {
  377. return ErrInvalidTopic
  378. }
  379. partitionPerBroker := make(map[*Broker][]int32)
  380. for partition := range partitionOffsets {
  381. broker, err := ca.client.Leader(topic, partition)
  382. if err != nil {
  383. return err
  384. }
  385. if _, ok := partitionPerBroker[broker]; ok {
  386. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  387. } else {
  388. partitionPerBroker[broker] = []int32{partition}
  389. }
  390. }
  391. errs := make([]error, 0)
  392. for broker, partitions := range partitionPerBroker {
  393. topics := make(map[string]*DeleteRecordsRequestTopic)
  394. recordsToDelete := make(map[int32]int64)
  395. for _, p := range partitions {
  396. recordsToDelete[p] = partitionOffsets[p]
  397. }
  398. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
  399. request := &DeleteRecordsRequest{
  400. Topics: topics,
  401. Timeout: ca.conf.Admin.Timeout,
  402. }
  403. rsp, err := broker.DeleteRecords(request)
  404. if err != nil {
  405. errs = append(errs, err)
  406. } else {
  407. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  408. if !ok {
  409. errs = append(errs, ErrIncompleteResponse)
  410. } else {
  411. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  412. if deleteRecordsResponsePartition.Err != ErrNoError {
  413. errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
  414. }
  415. }
  416. }
  417. }
  418. }
  419. if len(errs) > 0 {
  420. return ErrDeleteRecords{MultiError{&errs}}
  421. }
  422. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  423. //for each partition instead of one error
  424. return nil
  425. }
  426. // Returns a bool indicating whether the resource request needs to go to a
  427. // specific broker
  428. func dependsOnSpecificNode(resource ConfigResource) bool {
  429. return (resource.Type == BrokerResource && resource.Name != "") ||
  430. resource.Type == BrokerLoggerResource
  431. }
  432. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  433. var entries []ConfigEntry
  434. var resources []*ConfigResource
  435. resources = append(resources, &resource)
  436. request := &DescribeConfigsRequest{
  437. Resources: resources,
  438. }
  439. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  440. request.Version = 1
  441. }
  442. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  443. request.Version = 2
  444. }
  445. var (
  446. b *Broker
  447. err error
  448. )
  449. // DescribeConfig of broker/broker logger must be sent to the broker in question
  450. if dependsOnSpecificNode(resource) {
  451. id, _ := strconv.Atoi(resource.Name)
  452. b, err = ca.findBroker(int32(id))
  453. } else {
  454. b, err = ca.findAnyBroker()
  455. }
  456. if err != nil {
  457. return nil, err
  458. }
  459. _ = b.Open(ca.client.Config())
  460. rsp, err := b.DescribeConfigs(request)
  461. if err != nil {
  462. return nil, err
  463. }
  464. for _, rspResource := range rsp.Resources {
  465. if rspResource.Name == resource.Name {
  466. if rspResource.ErrorMsg != "" {
  467. return nil, errors.New(rspResource.ErrorMsg)
  468. }
  469. for _, cfgEntry := range rspResource.Configs {
  470. entries = append(entries, *cfgEntry)
  471. }
  472. }
  473. }
  474. return entries, nil
  475. }
  476. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  477. var resources []*AlterConfigsResource
  478. resources = append(resources, &AlterConfigsResource{
  479. Type: resourceType,
  480. Name: name,
  481. ConfigEntries: entries,
  482. })
  483. request := &AlterConfigsRequest{
  484. Resources: resources,
  485. ValidateOnly: validateOnly,
  486. }
  487. var (
  488. b *Broker
  489. err error
  490. )
  491. // AlterConfig of broker/broker logger must be sent to the broker in question
  492. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  493. id, _ := strconv.Atoi(name)
  494. b, err = ca.findBroker(int32(id))
  495. } else {
  496. b, err = ca.findAnyBroker()
  497. }
  498. if err != nil {
  499. return err
  500. }
  501. _ = b.Open(ca.client.Config())
  502. rsp, err := b.AlterConfigs(request)
  503. if err != nil {
  504. return err
  505. }
  506. for _, rspResource := range rsp.Resources {
  507. if rspResource.Name == name {
  508. if rspResource.ErrorMsg != "" {
  509. return errors.New(rspResource.ErrorMsg)
  510. }
  511. }
  512. }
  513. return nil
  514. }
  515. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  516. var acls []*AclCreation
  517. acls = append(acls, &AclCreation{resource, acl})
  518. request := &CreateAclsRequest{AclCreations: acls}
  519. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  520. request.Version = 1
  521. }
  522. b, err := ca.Controller()
  523. if err != nil {
  524. return err
  525. }
  526. _, err = b.CreateAcls(request)
  527. return err
  528. }
  529. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  530. request := &DescribeAclsRequest{AclFilter: filter}
  531. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  532. request.Version = 1
  533. }
  534. b, err := ca.Controller()
  535. if err != nil {
  536. return nil, err
  537. }
  538. rsp, err := b.DescribeAcls(request)
  539. if err != nil {
  540. return nil, err
  541. }
  542. var lAcls []ResourceAcls
  543. for _, rAcl := range rsp.ResourceAcls {
  544. lAcls = append(lAcls, *rAcl)
  545. }
  546. return lAcls, nil
  547. }
  548. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  549. var filters []*AclFilter
  550. filters = append(filters, &filter)
  551. request := &DeleteAclsRequest{Filters: filters}
  552. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  553. request.Version = 1
  554. }
  555. b, err := ca.Controller()
  556. if err != nil {
  557. return nil, err
  558. }
  559. rsp, err := b.DeleteAcls(request)
  560. if err != nil {
  561. return nil, err
  562. }
  563. var mAcls []MatchingAcl
  564. for _, fr := range rsp.FilterResponses {
  565. for _, mACL := range fr.MatchingAcls {
  566. mAcls = append(mAcls, *mACL)
  567. }
  568. }
  569. return mAcls, nil
  570. }
  571. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  572. groupsPerBroker := make(map[*Broker][]string)
  573. for _, group := range groups {
  574. controller, err := ca.client.Coordinator(group)
  575. if err != nil {
  576. return nil, err
  577. }
  578. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  579. }
  580. for broker, brokerGroups := range groupsPerBroker {
  581. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  582. Groups: brokerGroups,
  583. })
  584. if err != nil {
  585. return nil, err
  586. }
  587. result = append(result, response.Groups...)
  588. }
  589. return result, nil
  590. }
  591. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  592. allGroups = make(map[string]string)
  593. // Query brokers in parallel, since we have to query *all* brokers
  594. brokers := ca.client.Brokers()
  595. groupMaps := make(chan map[string]string, len(brokers))
  596. errors := make(chan error, len(brokers))
  597. wg := sync.WaitGroup{}
  598. for _, b := range brokers {
  599. wg.Add(1)
  600. go func(b *Broker, conf *Config) {
  601. defer wg.Done()
  602. _ = b.Open(conf) // Ensure that broker is opened
  603. response, err := b.ListGroups(&ListGroupsRequest{})
  604. if err != nil {
  605. errors <- err
  606. return
  607. }
  608. groups := make(map[string]string)
  609. for group, typ := range response.Groups {
  610. groups[group] = typ
  611. }
  612. groupMaps <- groups
  613. }(b, ca.conf)
  614. }
  615. wg.Wait()
  616. close(groupMaps)
  617. close(errors)
  618. for groupMap := range groupMaps {
  619. for group, protocolType := range groupMap {
  620. allGroups[group] = protocolType
  621. }
  622. }
  623. // Intentionally return only the first error for simplicity
  624. err = <-errors
  625. return
  626. }
  627. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  628. coordinator, err := ca.client.Coordinator(group)
  629. if err != nil {
  630. return nil, err
  631. }
  632. request := &OffsetFetchRequest{
  633. ConsumerGroup: group,
  634. partitions: topicPartitions,
  635. }
  636. if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  637. request.Version = 2
  638. } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  639. request.Version = 1
  640. }
  641. return coordinator.FetchOffset(request)
  642. }
  643. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  644. coordinator, err := ca.client.Coordinator(group)
  645. if err != nil {
  646. return err
  647. }
  648. request := &DeleteGroupsRequest{
  649. Groups: []string{group},
  650. }
  651. resp, err := coordinator.DeleteGroups(request)
  652. if err != nil {
  653. return err
  654. }
  655. groupErr, ok := resp.GroupErrorCodes[group]
  656. if !ok {
  657. return ErrIncompleteResponse
  658. }
  659. if groupErr != ErrNoError {
  660. return groupErr
  661. }
  662. return nil
  663. }