admin.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. )
  9. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  10. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  11. // Methods with stricter requirements will specify the minimum broker version required.
  12. // You MUST call Close() on a client to avoid leaks
  13. type ClusterAdmin interface {
  14. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  15. // It may take several seconds after CreateTopic returns success for all the brokers
  16. // to become aware that the topic has been created. During this time, listTopics
  17. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  18. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  19. // List the topics available in the cluster with the default options.
  20. ListTopics() (map[string]TopicDetail, error)
  21. // Describe some topics in the cluster.
  22. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  23. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  24. // and for all the brokers to become aware that the topics are gone.
  25. // During this time, listTopics may continue to return information about the deleted topic.
  26. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  27. // the topic for deletion, but not actually delete them.
  28. // This operation is supported by brokers with version 0.10.1.0 or higher.
  29. DeleteTopic(topic string) error
  30. // Increase the number of partitions of the topics according to the corresponding values.
  31. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  32. // the messages will be affected. It may take several seconds after this method returns
  33. // success for all the brokers to become aware that the partitions have been created.
  34. // During this time, ClusterAdmin#describeTopics may not return information about the
  35. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  36. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  37. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  38. // This operation is supported by brokers with version 0.11.0.0 or higher.
  39. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  40. // Get the configuration for the specified resources.
  41. // The returned configuration includes default values and the Default is true
  42. // can be used to distinguish them from user supplied values.
  43. // Config entries where ReadOnly is true cannot be updated.
  44. // The value of config entries where Sensitive is true is always nil so
  45. // sensitive information is not disclosed.
  46. // This operation is supported by brokers with version 0.11.0.0 or higher.
  47. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  48. // Update the configuration for the specified resources with the default options.
  49. // This operation is supported by brokers with version 0.11.0.0 or higher.
  50. // The resources with their configs (topic is the only resource type with configs
  51. // that can be updated currently Updates are not transactional so they may succeed
  52. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  53. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  54. // Creates access control lists (ACLs) which are bound to specific resources.
  55. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  56. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  57. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  58. CreateACL(resource Resource, acl Acl) error
  59. // Lists access control lists (ACLs) according to the supplied filter.
  60. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  61. // This operation is supported by brokers with version 0.11.0.0 or higher.
  62. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  63. // Deletes access control lists (ACLs) according to the supplied filters.
  64. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  65. // This operation is supported by brokers with version 0.11.0.0 or higher.
  66. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  67. // List the consumer groups available in the cluster.
  68. ListConsumerGroups() (map[string]string, error)
  69. // Describe the given consumer groups.
  70. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  71. // List the consumer group offsets available in the cluster.
  72. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  73. // Delete a consumer group.
  74. DeleteConsumerGroup(group string) error
  75. // Get information about the nodes in the cluster
  76. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  77. // Close shuts down the admin and closes underlying client.
  78. Close() error
  79. }
  80. type clusterAdmin struct {
  81. client Client
  82. conf *Config
  83. }
  84. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  85. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  86. client, err := NewClient(addrs, conf)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return NewClusterAdminFromClient(client)
  91. }
  92. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  93. // Note that underlying client will also be closed on admin's Close() call.
  94. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  95. //make sure we can retrieve the controller
  96. _, err := client.Controller()
  97. if err != nil {
  98. return nil, err
  99. }
  100. ca := &clusterAdmin{
  101. client: client,
  102. conf: client.Config(),
  103. }
  104. return ca, nil
  105. }
  106. func (ca *clusterAdmin) Close() error {
  107. return ca.client.Close()
  108. }
  109. func (ca *clusterAdmin) Controller() (*Broker, error) {
  110. return ca.client.Controller()
  111. }
  112. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  113. if topic == "" {
  114. return ErrInvalidTopic
  115. }
  116. if detail == nil {
  117. return errors.New("you must specify topic details")
  118. }
  119. topicDetails := make(map[string]*TopicDetail)
  120. topicDetails[topic] = detail
  121. request := &CreateTopicsRequest{
  122. TopicDetails: topicDetails,
  123. ValidateOnly: validateOnly,
  124. Timeout: ca.conf.Admin.Timeout,
  125. }
  126. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  127. request.Version = 1
  128. }
  129. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  130. request.Version = 2
  131. }
  132. b, err := ca.Controller()
  133. if err != nil {
  134. return err
  135. }
  136. rsp, err := b.CreateTopics(request)
  137. if err != nil {
  138. return err
  139. }
  140. topicErr, ok := rsp.TopicErrors[topic]
  141. if !ok {
  142. return ErrIncompleteResponse
  143. }
  144. if topicErr.Err != ErrNoError {
  145. return topicErr
  146. }
  147. return nil
  148. }
  149. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  150. controller, err := ca.Controller()
  151. if err != nil {
  152. return nil, err
  153. }
  154. request := &MetadataRequest{
  155. Topics: topics,
  156. AllowAutoTopicCreation: false,
  157. }
  158. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  159. request.Version = 5
  160. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  161. request.Version = 4
  162. }
  163. response, err := controller.GetMetadata(request)
  164. if err != nil {
  165. return nil, err
  166. }
  167. return response.Topics, nil
  168. }
  169. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  170. controller, err := ca.Controller()
  171. if err != nil {
  172. return nil, int32(0), err
  173. }
  174. request := &MetadataRequest{
  175. Topics: []string{},
  176. }
  177. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  178. request.Version = 1
  179. }
  180. response, err := controller.GetMetadata(request)
  181. if err != nil {
  182. return nil, int32(0), err
  183. }
  184. return response.Brokers, response.ControllerID, nil
  185. }
  186. func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
  187. brokers := ca.client.Brokers()
  188. for _, b := range brokers {
  189. if b.ID() == id {
  190. return b, nil
  191. }
  192. }
  193. return nil, fmt.Errorf("could not find broker id %d", id)
  194. }
  195. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  196. brokers := ca.client.Brokers()
  197. if len(brokers) > 0 {
  198. index := rand.Intn(len(brokers))
  199. return brokers[index], nil
  200. }
  201. return nil, errors.New("no available broker")
  202. }
  203. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  204. // In order to build TopicDetails we need to first get the list of all
  205. // topics using a MetadataRequest and then get their configs using a
  206. // DescribeConfigsRequest request. To avoid sending many requests to the
  207. // broker, we use a single DescribeConfigsRequest.
  208. // Send the all-topic MetadataRequest
  209. b, err := ca.findAnyBroker()
  210. if err != nil {
  211. return nil, err
  212. }
  213. _ = b.Open(ca.client.Config())
  214. metadataReq := &MetadataRequest{}
  215. metadataResp, err := b.GetMetadata(metadataReq)
  216. if err != nil {
  217. return nil, err
  218. }
  219. topicsDetailsMap := make(map[string]TopicDetail)
  220. var describeConfigsResources []*ConfigResource
  221. for _, topic := range metadataResp.Topics {
  222. topicDetails := TopicDetail{
  223. NumPartitions: int32(len(topic.Partitions)),
  224. }
  225. if len(topic.Partitions) > 0 {
  226. topicDetails.ReplicaAssignment = map[int32][]int32{}
  227. for _, partition := range topic.Partitions {
  228. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  229. }
  230. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  231. }
  232. topicsDetailsMap[topic.Name] = topicDetails
  233. // we populate the resources we want to describe from the MetadataResponse
  234. topicResource := ConfigResource{
  235. Type: TopicResource,
  236. Name: topic.Name,
  237. }
  238. describeConfigsResources = append(describeConfigsResources, &topicResource)
  239. }
  240. // Send the DescribeConfigsRequest
  241. describeConfigsReq := &DescribeConfigsRequest{
  242. Resources: describeConfigsResources,
  243. }
  244. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  245. if err != nil {
  246. return nil, err
  247. }
  248. for _, resource := range describeConfigsResp.Resources {
  249. topicDetails := topicsDetailsMap[resource.Name]
  250. topicDetails.ConfigEntries = make(map[string]*string)
  251. for _, entry := range resource.Configs {
  252. // only include non-default non-sensitive config
  253. // (don't actually think topic config will ever be sensitive)
  254. if entry.Default || entry.Sensitive {
  255. continue
  256. }
  257. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  258. }
  259. topicsDetailsMap[resource.Name] = topicDetails
  260. }
  261. return topicsDetailsMap, nil
  262. }
  263. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  264. if topic == "" {
  265. return ErrInvalidTopic
  266. }
  267. request := &DeleteTopicsRequest{
  268. Topics: []string{topic},
  269. Timeout: ca.conf.Admin.Timeout,
  270. }
  271. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  272. request.Version = 1
  273. }
  274. b, err := ca.Controller()
  275. if err != nil {
  276. return err
  277. }
  278. rsp, err := b.DeleteTopics(request)
  279. if err != nil {
  280. return err
  281. }
  282. topicErr, ok := rsp.TopicErrorCodes[topic]
  283. if !ok {
  284. return ErrIncompleteResponse
  285. }
  286. if topicErr != ErrNoError {
  287. return topicErr
  288. }
  289. return nil
  290. }
  291. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  292. if topic == "" {
  293. return ErrInvalidTopic
  294. }
  295. topicPartitions := make(map[string]*TopicPartition)
  296. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  297. request := &CreatePartitionsRequest{
  298. TopicPartitions: topicPartitions,
  299. Timeout: ca.conf.Admin.Timeout,
  300. }
  301. b, err := ca.Controller()
  302. if err != nil {
  303. return err
  304. }
  305. rsp, err := b.CreatePartitions(request)
  306. if err != nil {
  307. return err
  308. }
  309. topicErr, ok := rsp.TopicPartitionErrors[topic]
  310. if !ok {
  311. return ErrIncompleteResponse
  312. }
  313. if topicErr.Err != ErrNoError {
  314. return topicErr
  315. }
  316. return nil
  317. }
  318. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  319. if topic == "" {
  320. return ErrInvalidTopic
  321. }
  322. partitionPerBroker := make(map[*Broker][]int32)
  323. for partition := range partitionOffsets {
  324. broker, err := ca.client.Leader(topic, partition)
  325. if err != nil {
  326. return err
  327. }
  328. if _, ok := partitionPerBroker[broker]; ok {
  329. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  330. } else {
  331. partitionPerBroker[broker] = []int32{partition}
  332. }
  333. }
  334. errs := make([]error, 0)
  335. for broker, partitions := range partitionPerBroker {
  336. topics := make(map[string]*DeleteRecordsRequestTopic)
  337. recordsToDelete := make(map[int32]int64)
  338. for _, p := range partitions {
  339. recordsToDelete[p] = partitionOffsets[p]
  340. }
  341. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
  342. request := &DeleteRecordsRequest{
  343. Topics: topics,
  344. Timeout: ca.conf.Admin.Timeout,
  345. }
  346. rsp, err := broker.DeleteRecords(request)
  347. if err != nil {
  348. errs = append(errs, err)
  349. } else {
  350. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  351. if !ok {
  352. errs = append(errs, ErrIncompleteResponse)
  353. } else {
  354. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  355. if deleteRecordsResponsePartition.Err != ErrNoError {
  356. errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
  357. }
  358. }
  359. }
  360. }
  361. }
  362. if len(errs) > 0 {
  363. return ErrDeleteRecords{MultiError{&errs}}
  364. }
  365. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  366. //for each partition instead of one error
  367. return nil
  368. }
  369. // Returns a bool indicating whether the resource request needs to go to a
  370. // specific broker
  371. func dependsOnSpecificNode(resource ConfigResource) bool {
  372. return (resource.Type == BrokerResource && resource.Name != "") ||
  373. resource.Type == BrokerLoggerResource
  374. }
  375. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  376. var entries []ConfigEntry
  377. var resources []*ConfigResource
  378. resources = append(resources, &resource)
  379. request := &DescribeConfigsRequest{
  380. Resources: resources,
  381. }
  382. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  383. request.Version = 1
  384. }
  385. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  386. request.Version = 2
  387. }
  388. var (
  389. b *Broker
  390. err error
  391. )
  392. // DescribeConfig of broker/broker logger must be sent to the broker in question
  393. if dependsOnSpecificNode(resource) {
  394. id, _ := strconv.Atoi(resource.Name)
  395. b, err = ca.findBroker(int32(id))
  396. } else {
  397. b, err = ca.findAnyBroker()
  398. }
  399. if err != nil {
  400. return nil, err
  401. }
  402. _ = b.Open(ca.client.Config())
  403. rsp, err := b.DescribeConfigs(request)
  404. if err != nil {
  405. return nil, err
  406. }
  407. for _, rspResource := range rsp.Resources {
  408. if rspResource.Name == resource.Name {
  409. if rspResource.ErrorMsg != "" {
  410. return nil, errors.New(rspResource.ErrorMsg)
  411. }
  412. for _, cfgEntry := range rspResource.Configs {
  413. entries = append(entries, *cfgEntry)
  414. }
  415. }
  416. }
  417. return entries, nil
  418. }
  419. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  420. var resources []*AlterConfigsResource
  421. resources = append(resources, &AlterConfigsResource{
  422. Type: resourceType,
  423. Name: name,
  424. ConfigEntries: entries,
  425. })
  426. request := &AlterConfigsRequest{
  427. Resources: resources,
  428. ValidateOnly: validateOnly,
  429. }
  430. var (
  431. b *Broker
  432. err error
  433. )
  434. // AlterConfig of broker/broker logger must be sent to the broker in question
  435. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  436. id, _ := strconv.Atoi(name)
  437. b, err = ca.findBroker(int32(id))
  438. } else {
  439. b, err = ca.findAnyBroker()
  440. }
  441. if err != nil {
  442. return err
  443. }
  444. _ = b.Open(ca.client.Config())
  445. rsp, err := b.AlterConfigs(request)
  446. if err != nil {
  447. return err
  448. }
  449. for _, rspResource := range rsp.Resources {
  450. if rspResource.Name == name {
  451. if rspResource.ErrorMsg != "" {
  452. return errors.New(rspResource.ErrorMsg)
  453. }
  454. }
  455. }
  456. return nil
  457. }
  458. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  459. var acls []*AclCreation
  460. acls = append(acls, &AclCreation{resource, acl})
  461. request := &CreateAclsRequest{AclCreations: acls}
  462. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  463. request.Version = 1
  464. }
  465. b, err := ca.Controller()
  466. if err != nil {
  467. return err
  468. }
  469. _, err = b.CreateAcls(request)
  470. return err
  471. }
  472. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  473. request := &DescribeAclsRequest{AclFilter: filter}
  474. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  475. request.Version = 1
  476. }
  477. b, err := ca.Controller()
  478. if err != nil {
  479. return nil, err
  480. }
  481. rsp, err := b.DescribeAcls(request)
  482. if err != nil {
  483. return nil, err
  484. }
  485. var lAcls []ResourceAcls
  486. for _, rAcl := range rsp.ResourceAcls {
  487. lAcls = append(lAcls, *rAcl)
  488. }
  489. return lAcls, nil
  490. }
  491. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  492. var filters []*AclFilter
  493. filters = append(filters, &filter)
  494. request := &DeleteAclsRequest{Filters: filters}
  495. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  496. request.Version = 1
  497. }
  498. b, err := ca.Controller()
  499. if err != nil {
  500. return nil, err
  501. }
  502. rsp, err := b.DeleteAcls(request)
  503. if err != nil {
  504. return nil, err
  505. }
  506. var mAcls []MatchingAcl
  507. for _, fr := range rsp.FilterResponses {
  508. for _, mACL := range fr.MatchingAcls {
  509. mAcls = append(mAcls, *mACL)
  510. }
  511. }
  512. return mAcls, nil
  513. }
  514. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  515. groupsPerBroker := make(map[*Broker][]string)
  516. for _, group := range groups {
  517. controller, err := ca.client.Coordinator(group)
  518. if err != nil {
  519. return nil, err
  520. }
  521. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  522. }
  523. for broker, brokerGroups := range groupsPerBroker {
  524. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  525. Groups: brokerGroups,
  526. })
  527. if err != nil {
  528. return nil, err
  529. }
  530. result = append(result, response.Groups...)
  531. }
  532. return result, nil
  533. }
  534. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  535. allGroups = make(map[string]string)
  536. // Query brokers in parallel, since we have to query *all* brokers
  537. brokers := ca.client.Brokers()
  538. groupMaps := make(chan map[string]string, len(brokers))
  539. errors := make(chan error, len(brokers))
  540. wg := sync.WaitGroup{}
  541. for _, b := range brokers {
  542. wg.Add(1)
  543. go func(b *Broker, conf *Config) {
  544. defer wg.Done()
  545. _ = b.Open(conf) // Ensure that broker is opened
  546. response, err := b.ListGroups(&ListGroupsRequest{})
  547. if err != nil {
  548. errors <- err
  549. return
  550. }
  551. groups := make(map[string]string)
  552. for group, typ := range response.Groups {
  553. groups[group] = typ
  554. }
  555. groupMaps <- groups
  556. }(b, ca.conf)
  557. }
  558. wg.Wait()
  559. close(groupMaps)
  560. close(errors)
  561. for groupMap := range groupMaps {
  562. for group, protocolType := range groupMap {
  563. allGroups[group] = protocolType
  564. }
  565. }
  566. // Intentionally return only the first error for simplicity
  567. err = <-errors
  568. return
  569. }
  570. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  571. coordinator, err := ca.client.Coordinator(group)
  572. if err != nil {
  573. return nil, err
  574. }
  575. request := &OffsetFetchRequest{
  576. ConsumerGroup: group,
  577. partitions: topicPartitions,
  578. }
  579. if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  580. request.Version = 2
  581. } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  582. request.Version = 1
  583. }
  584. return coordinator.FetchOffset(request)
  585. }
  586. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  587. coordinator, err := ca.client.Coordinator(group)
  588. if err != nil {
  589. return err
  590. }
  591. request := &DeleteGroupsRequest{
  592. Groups: []string{group},
  593. }
  594. resp, err := coordinator.DeleteGroups(request)
  595. if err != nil {
  596. return err
  597. }
  598. groupErr, ok := resp.GroupErrorCodes[group]
  599. if !ok {
  600. return ErrIncompleteResponse
  601. }
  602. if groupErr != ErrNoError {
  603. return groupErr
  604. }
  605. return nil
  606. }