admin.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  11. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  12. // Methods with stricter requirements will specify the minimum broker version required.
  13. // You MUST call Close() on a client to avoid leaks
  14. type ClusterAdmin interface {
  15. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  16. // It may take several seconds after CreateTopic returns success for all the brokers
  17. // to become aware that the topic has been created. During this time, listTopics
  18. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  19. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  20. // List the topics available in the cluster with the default options.
  21. ListTopics() (map[string]TopicDetail, error)
  22. // Describe some topics in the cluster.
  23. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  24. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  25. // and for all the brokers to become aware that the topics are gone.
  26. // During this time, listTopics may continue to return information about the deleted topic.
  27. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  28. // the topic for deletion, but not actually delete them.
  29. // This operation is supported by brokers with version 0.10.1.0 or higher.
  30. DeleteTopic(topic string) error
  31. // Increase the number of partitions of the topics according to the corresponding values.
  32. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  33. // the messages will be affected. It may take several seconds after this method returns
  34. // success for all the brokers to become aware that the partitions have been created.
  35. // During this time, ClusterAdmin#describeTopics may not return information about the
  36. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  37. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  38. // Alter the replica assignment for partitions.
  39. // This operation is supported by brokers with version 2.4.0.0 or higher.
  40. AlterPartitionReassignments(topic string, assignment [][]int32) error
  41. // Provides info on ongoing partitions replica reassignments.
  42. // This operation is supported by brokers with version 2.4.0.0 or higher.
  43. ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
  44. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  45. // This operation is supported by brokers with version 0.11.0.0 or higher.
  46. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  47. // Get the configuration for the specified resources.
  48. // The returned configuration includes default values and the Default is true
  49. // can be used to distinguish them from user supplied values.
  50. // Config entries where ReadOnly is true cannot be updated.
  51. // The value of config entries where Sensitive is true is always nil so
  52. // sensitive information is not disclosed.
  53. // This operation is supported by brokers with version 0.11.0.0 or higher.
  54. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  55. // Update the configuration for the specified resources with the default options.
  56. // This operation is supported by brokers with version 0.11.0.0 or higher.
  57. // The resources with their configs (topic is the only resource type with configs
  58. // that can be updated currently Updates are not transactional so they may succeed
  59. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  60. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  61. // Creates access control lists (ACLs) which are bound to specific resources.
  62. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  63. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  64. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  65. CreateACL(resource Resource, acl Acl) error
  66. // Lists access control lists (ACLs) according to the supplied filter.
  67. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  68. // This operation is supported by brokers with version 0.11.0.0 or higher.
  69. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  70. // Deletes access control lists (ACLs) according to the supplied filters.
  71. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  72. // This operation is supported by brokers with version 0.11.0.0 or higher.
  73. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  74. // List the consumer groups available in the cluster.
  75. ListConsumerGroups() (map[string]string, error)
  76. // Describe the given consumer groups.
  77. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  78. // List the consumer group offsets available in the cluster.
  79. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  80. // Delete a consumer group.
  81. DeleteConsumerGroup(group string) error
  82. // Get information about the nodes in the cluster
  83. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  84. // Get information about all log directories on the given set of brokers
  85. DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
  86. // Close shuts down the admin and closes underlying client.
  87. Close() error
  88. }
  89. type clusterAdmin struct {
  90. client Client
  91. conf *Config
  92. }
  93. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  94. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  95. client, err := NewClient(addrs, conf)
  96. if err != nil {
  97. return nil, err
  98. }
  99. return NewClusterAdminFromClient(client)
  100. }
  101. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  102. // Note that underlying client will also be closed on admin's Close() call.
  103. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  104. //make sure we can retrieve the controller
  105. _, err := client.Controller()
  106. if err != nil {
  107. return nil, err
  108. }
  109. ca := &clusterAdmin{
  110. client: client,
  111. conf: client.Config(),
  112. }
  113. return ca, nil
  114. }
  115. func (ca *clusterAdmin) Close() error {
  116. return ca.client.Close()
  117. }
  118. func (ca *clusterAdmin) Controller() (*Broker, error) {
  119. return ca.client.Controller()
  120. }
  121. func (ca *clusterAdmin) refreshController() (*Broker, error) {
  122. return ca.client.RefreshController()
  123. }
  124. // isErrNoController returns `true` if the given error type unwraps to an
  125. // `ErrNotController` response from Kafka
  126. func isErrNoController(err error) bool {
  127. switch e := err.(type) {
  128. case *TopicError:
  129. return e.Err == ErrNotController
  130. case *TopicPartitionError:
  131. return e.Err == ErrNotController
  132. case KError:
  133. return e == ErrNotController
  134. }
  135. return false
  136. }
  137. // retryOnError will repeatedly call the given (error-returning) func in the
  138. // case that its response is non-nil and retriable (as determined by the
  139. // provided retriable func) up to the maximum number of tries permitted by
  140. // the admin client configuration
  141. func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
  142. var err error
  143. for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
  144. err = fn()
  145. if err == nil || !retriable(err) {
  146. return err
  147. }
  148. Logger.Printf(
  149. "admin/request retrying after %dms... (%d attempts remaining)\n",
  150. ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
  151. time.Sleep(ca.conf.Admin.Retry.Backoff)
  152. continue
  153. }
  154. return err
  155. }
  156. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  157. if topic == "" {
  158. return ErrInvalidTopic
  159. }
  160. if detail == nil {
  161. return errors.New("you must specify topic details")
  162. }
  163. topicDetails := make(map[string]*TopicDetail)
  164. topicDetails[topic] = detail
  165. request := &CreateTopicsRequest{
  166. TopicDetails: topicDetails,
  167. ValidateOnly: validateOnly,
  168. Timeout: ca.conf.Admin.Timeout,
  169. }
  170. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  171. request.Version = 1
  172. }
  173. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  174. request.Version = 2
  175. }
  176. return ca.retryOnError(isErrNoController, func() error {
  177. b, err := ca.Controller()
  178. if err != nil {
  179. return err
  180. }
  181. rsp, err := b.CreateTopics(request)
  182. if err != nil {
  183. return err
  184. }
  185. topicErr, ok := rsp.TopicErrors[topic]
  186. if !ok {
  187. return ErrIncompleteResponse
  188. }
  189. if topicErr.Err != ErrNoError {
  190. if topicErr.Err == ErrNotController {
  191. _, _ = ca.refreshController()
  192. }
  193. return topicErr
  194. }
  195. return nil
  196. })
  197. }
  198. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  199. controller, err := ca.Controller()
  200. if err != nil {
  201. return nil, err
  202. }
  203. request := &MetadataRequest{
  204. Topics: topics,
  205. AllowAutoTopicCreation: false,
  206. }
  207. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  208. request.Version = 5
  209. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  210. request.Version = 4
  211. }
  212. response, err := controller.GetMetadata(request)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return response.Topics, nil
  217. }
  218. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  219. controller, err := ca.Controller()
  220. if err != nil {
  221. return nil, int32(0), err
  222. }
  223. request := &MetadataRequest{
  224. Topics: []string{},
  225. }
  226. if ca.conf.Version.IsAtLeast(V0_10_0_0) {
  227. request.Version = 1
  228. }
  229. response, err := controller.GetMetadata(request)
  230. if err != nil {
  231. return nil, int32(0), err
  232. }
  233. return response.Brokers, response.ControllerID, nil
  234. }
  235. func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
  236. brokers := ca.client.Brokers()
  237. for _, b := range brokers {
  238. if b.ID() == id {
  239. return b, nil
  240. }
  241. }
  242. return nil, fmt.Errorf("could not find broker id %d", id)
  243. }
  244. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  245. brokers := ca.client.Brokers()
  246. if len(brokers) > 0 {
  247. index := rand.Intn(len(brokers))
  248. return brokers[index], nil
  249. }
  250. return nil, errors.New("no available broker")
  251. }
  252. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  253. // In order to build TopicDetails we need to first get the list of all
  254. // topics using a MetadataRequest and then get their configs using a
  255. // DescribeConfigsRequest request. To avoid sending many requests to the
  256. // broker, we use a single DescribeConfigsRequest.
  257. // Send the all-topic MetadataRequest
  258. b, err := ca.findAnyBroker()
  259. if err != nil {
  260. return nil, err
  261. }
  262. _ = b.Open(ca.client.Config())
  263. metadataReq := &MetadataRequest{}
  264. metadataResp, err := b.GetMetadata(metadataReq)
  265. if err != nil {
  266. return nil, err
  267. }
  268. topicsDetailsMap := make(map[string]TopicDetail)
  269. var describeConfigsResources []*ConfigResource
  270. for _, topic := range metadataResp.Topics {
  271. topicDetails := TopicDetail{
  272. NumPartitions: int32(len(topic.Partitions)),
  273. }
  274. if len(topic.Partitions) > 0 {
  275. topicDetails.ReplicaAssignment = map[int32][]int32{}
  276. for _, partition := range topic.Partitions {
  277. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  278. }
  279. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  280. }
  281. topicsDetailsMap[topic.Name] = topicDetails
  282. // we populate the resources we want to describe from the MetadataResponse
  283. topicResource := ConfigResource{
  284. Type: TopicResource,
  285. Name: topic.Name,
  286. }
  287. describeConfigsResources = append(describeConfigsResources, &topicResource)
  288. }
  289. // Send the DescribeConfigsRequest
  290. describeConfigsReq := &DescribeConfigsRequest{
  291. Resources: describeConfigsResources,
  292. }
  293. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  294. describeConfigsReq.Version = 1
  295. }
  296. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  297. describeConfigsReq.Version = 2
  298. }
  299. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  300. if err != nil {
  301. return nil, err
  302. }
  303. for _, resource := range describeConfigsResp.Resources {
  304. topicDetails := topicsDetailsMap[resource.Name]
  305. topicDetails.ConfigEntries = make(map[string]*string)
  306. for _, entry := range resource.Configs {
  307. // only include non-default non-sensitive config
  308. // (don't actually think topic config will ever be sensitive)
  309. if entry.Default || entry.Sensitive {
  310. continue
  311. }
  312. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  313. }
  314. topicsDetailsMap[resource.Name] = topicDetails
  315. }
  316. return topicsDetailsMap, nil
  317. }
  318. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  319. if topic == "" {
  320. return ErrInvalidTopic
  321. }
  322. request := &DeleteTopicsRequest{
  323. Topics: []string{topic},
  324. Timeout: ca.conf.Admin.Timeout,
  325. }
  326. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  327. request.Version = 1
  328. }
  329. return ca.retryOnError(isErrNoController, func() error {
  330. b, err := ca.Controller()
  331. if err != nil {
  332. return err
  333. }
  334. rsp, err := b.DeleteTopics(request)
  335. if err != nil {
  336. return err
  337. }
  338. topicErr, ok := rsp.TopicErrorCodes[topic]
  339. if !ok {
  340. return ErrIncompleteResponse
  341. }
  342. if topicErr != ErrNoError {
  343. if topicErr == ErrNotController {
  344. _, _ = ca.refreshController()
  345. }
  346. return topicErr
  347. }
  348. return nil
  349. })
  350. }
  351. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  352. if topic == "" {
  353. return ErrInvalidTopic
  354. }
  355. topicPartitions := make(map[string]*TopicPartition)
  356. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  357. request := &CreatePartitionsRequest{
  358. TopicPartitions: topicPartitions,
  359. Timeout: ca.conf.Admin.Timeout,
  360. }
  361. return ca.retryOnError(isErrNoController, func() error {
  362. b, err := ca.Controller()
  363. if err != nil {
  364. return err
  365. }
  366. rsp, err := b.CreatePartitions(request)
  367. if err != nil {
  368. return err
  369. }
  370. topicErr, ok := rsp.TopicPartitionErrors[topic]
  371. if !ok {
  372. return ErrIncompleteResponse
  373. }
  374. if topicErr.Err != ErrNoError {
  375. if topicErr.Err == ErrNotController {
  376. _, _ = ca.refreshController()
  377. }
  378. return topicErr
  379. }
  380. return nil
  381. })
  382. }
  383. func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
  384. if topic == "" {
  385. return ErrInvalidTopic
  386. }
  387. request := &AlterPartitionReassignmentsRequest{
  388. TimeoutMs: int32(60000),
  389. Version: int16(0),
  390. }
  391. for i := 0; i < len(assignment); i++ {
  392. request.AddBlock(topic, int32(i), assignment[i])
  393. }
  394. return ca.retryOnError(isErrNoController, func() error {
  395. b, err := ca.Controller()
  396. if err != nil {
  397. return err
  398. }
  399. errs := make([]error, 0)
  400. rsp, err := b.AlterPartitionReassignments(request)
  401. if err != nil {
  402. errs = append(errs, err)
  403. } else {
  404. if rsp.ErrorCode > 0 {
  405. errs = append(errs, errors.New(rsp.ErrorCode.Error()))
  406. }
  407. for topic, topicErrors := range rsp.Errors {
  408. for partition, partitionError := range topicErrors {
  409. if partitionError.errorCode != ErrNoError {
  410. errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
  411. errs = append(errs, errors.New(errStr))
  412. }
  413. }
  414. }
  415. }
  416. if len(errs) > 0 {
  417. return ErrReassignPartitions{MultiError{&errs}}
  418. }
  419. return nil
  420. })
  421. }
  422. func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
  423. if topic == "" {
  424. return nil, ErrInvalidTopic
  425. }
  426. request := &ListPartitionReassignmentsRequest{
  427. TimeoutMs: int32(60000),
  428. Version: int16(0),
  429. }
  430. request.AddBlock(topic, partitions)
  431. b, err := ca.Controller()
  432. if err != nil {
  433. return nil, err
  434. }
  435. _ = b.Open(ca.client.Config())
  436. rsp, err := b.ListPartitionReassignments(request)
  437. if err == nil && rsp != nil {
  438. return rsp.TopicStatus, nil
  439. } else {
  440. return nil, err
  441. }
  442. }
  443. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  444. if topic == "" {
  445. return ErrInvalidTopic
  446. }
  447. partitionPerBroker := make(map[*Broker][]int32)
  448. for partition := range partitionOffsets {
  449. broker, err := ca.client.Leader(topic, partition)
  450. if err != nil {
  451. return err
  452. }
  453. if _, ok := partitionPerBroker[broker]; ok {
  454. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  455. } else {
  456. partitionPerBroker[broker] = []int32{partition}
  457. }
  458. }
  459. errs := make([]error, 0)
  460. for broker, partitions := range partitionPerBroker {
  461. topics := make(map[string]*DeleteRecordsRequestTopic)
  462. recordsToDelete := make(map[int32]int64)
  463. for _, p := range partitions {
  464. recordsToDelete[p] = partitionOffsets[p]
  465. }
  466. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
  467. request := &DeleteRecordsRequest{
  468. Topics: topics,
  469. Timeout: ca.conf.Admin.Timeout,
  470. }
  471. rsp, err := broker.DeleteRecords(request)
  472. if err != nil {
  473. errs = append(errs, err)
  474. } else {
  475. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  476. if !ok {
  477. errs = append(errs, ErrIncompleteResponse)
  478. } else {
  479. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  480. if deleteRecordsResponsePartition.Err != ErrNoError {
  481. errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
  482. }
  483. }
  484. }
  485. }
  486. }
  487. if len(errs) > 0 {
  488. return ErrDeleteRecords{MultiError{&errs}}
  489. }
  490. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  491. //for each partition instead of one error
  492. return nil
  493. }
  494. // Returns a bool indicating whether the resource request needs to go to a
  495. // specific broker
  496. func dependsOnSpecificNode(resource ConfigResource) bool {
  497. return (resource.Type == BrokerResource && resource.Name != "") ||
  498. resource.Type == BrokerLoggerResource
  499. }
  500. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  501. var entries []ConfigEntry
  502. var resources []*ConfigResource
  503. resources = append(resources, &resource)
  504. request := &DescribeConfigsRequest{
  505. Resources: resources,
  506. }
  507. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  508. request.Version = 1
  509. }
  510. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  511. request.Version = 2
  512. }
  513. var (
  514. b *Broker
  515. err error
  516. )
  517. // DescribeConfig of broker/broker logger must be sent to the broker in question
  518. if dependsOnSpecificNode(resource) {
  519. id, _ := strconv.Atoi(resource.Name)
  520. b, err = ca.findBroker(int32(id))
  521. } else {
  522. b, err = ca.findAnyBroker()
  523. }
  524. if err != nil {
  525. return nil, err
  526. }
  527. _ = b.Open(ca.client.Config())
  528. rsp, err := b.DescribeConfigs(request)
  529. if err != nil {
  530. return nil, err
  531. }
  532. for _, rspResource := range rsp.Resources {
  533. if rspResource.Name == resource.Name {
  534. if rspResource.ErrorMsg != "" {
  535. return nil, errors.New(rspResource.ErrorMsg)
  536. }
  537. if rspResource.ErrorCode != 0 {
  538. return nil, KError(rspResource.ErrorCode)
  539. }
  540. for _, cfgEntry := range rspResource.Configs {
  541. entries = append(entries, *cfgEntry)
  542. }
  543. }
  544. }
  545. return entries, nil
  546. }
  547. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  548. var resources []*AlterConfigsResource
  549. resources = append(resources, &AlterConfigsResource{
  550. Type: resourceType,
  551. Name: name,
  552. ConfigEntries: entries,
  553. })
  554. request := &AlterConfigsRequest{
  555. Resources: resources,
  556. ValidateOnly: validateOnly,
  557. }
  558. var (
  559. b *Broker
  560. err error
  561. )
  562. // AlterConfig of broker/broker logger must be sent to the broker in question
  563. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  564. id, _ := strconv.Atoi(name)
  565. b, err = ca.findBroker(int32(id))
  566. } else {
  567. b, err = ca.findAnyBroker()
  568. }
  569. if err != nil {
  570. return err
  571. }
  572. _ = b.Open(ca.client.Config())
  573. rsp, err := b.AlterConfigs(request)
  574. if err != nil {
  575. return err
  576. }
  577. for _, rspResource := range rsp.Resources {
  578. if rspResource.Name == name {
  579. if rspResource.ErrorMsg != "" {
  580. return errors.New(rspResource.ErrorMsg)
  581. }
  582. if rspResource.ErrorCode != 0 {
  583. return KError(rspResource.ErrorCode)
  584. }
  585. }
  586. }
  587. return nil
  588. }
  589. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  590. var acls []*AclCreation
  591. acls = append(acls, &AclCreation{resource, acl})
  592. request := &CreateAclsRequest{AclCreations: acls}
  593. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  594. request.Version = 1
  595. }
  596. b, err := ca.Controller()
  597. if err != nil {
  598. return err
  599. }
  600. _, err = b.CreateAcls(request)
  601. return err
  602. }
  603. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  604. request := &DescribeAclsRequest{AclFilter: filter}
  605. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  606. request.Version = 1
  607. }
  608. b, err := ca.Controller()
  609. if err != nil {
  610. return nil, err
  611. }
  612. rsp, err := b.DescribeAcls(request)
  613. if err != nil {
  614. return nil, err
  615. }
  616. var lAcls []ResourceAcls
  617. for _, rAcl := range rsp.ResourceAcls {
  618. lAcls = append(lAcls, *rAcl)
  619. }
  620. return lAcls, nil
  621. }
  622. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  623. var filters []*AclFilter
  624. filters = append(filters, &filter)
  625. request := &DeleteAclsRequest{Filters: filters}
  626. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  627. request.Version = 1
  628. }
  629. b, err := ca.Controller()
  630. if err != nil {
  631. return nil, err
  632. }
  633. rsp, err := b.DeleteAcls(request)
  634. if err != nil {
  635. return nil, err
  636. }
  637. var mAcls []MatchingAcl
  638. for _, fr := range rsp.FilterResponses {
  639. for _, mACL := range fr.MatchingAcls {
  640. mAcls = append(mAcls, *mACL)
  641. }
  642. }
  643. return mAcls, nil
  644. }
  645. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  646. groupsPerBroker := make(map[*Broker][]string)
  647. for _, group := range groups {
  648. controller, err := ca.client.Coordinator(group)
  649. if err != nil {
  650. return nil, err
  651. }
  652. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  653. }
  654. for broker, brokerGroups := range groupsPerBroker {
  655. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  656. Groups: brokerGroups,
  657. })
  658. if err != nil {
  659. return nil, err
  660. }
  661. result = append(result, response.Groups...)
  662. }
  663. return result, nil
  664. }
  665. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  666. allGroups = make(map[string]string)
  667. // Query brokers in parallel, since we have to query *all* brokers
  668. brokers := ca.client.Brokers()
  669. groupMaps := make(chan map[string]string, len(brokers))
  670. errors := make(chan error, len(brokers))
  671. wg := sync.WaitGroup{}
  672. for _, b := range brokers {
  673. wg.Add(1)
  674. go func(b *Broker, conf *Config) {
  675. defer wg.Done()
  676. _ = b.Open(conf) // Ensure that broker is opened
  677. response, err := b.ListGroups(&ListGroupsRequest{})
  678. if err != nil {
  679. errors <- err
  680. return
  681. }
  682. groups := make(map[string]string)
  683. for group, typ := range response.Groups {
  684. groups[group] = typ
  685. }
  686. groupMaps <- groups
  687. }(b, ca.conf)
  688. }
  689. wg.Wait()
  690. close(groupMaps)
  691. close(errors)
  692. for groupMap := range groupMaps {
  693. for group, protocolType := range groupMap {
  694. allGroups[group] = protocolType
  695. }
  696. }
  697. // Intentionally return only the first error for simplicity
  698. err = <-errors
  699. return
  700. }
  701. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  702. coordinator, err := ca.client.Coordinator(group)
  703. if err != nil {
  704. return nil, err
  705. }
  706. request := &OffsetFetchRequest{
  707. ConsumerGroup: group,
  708. partitions: topicPartitions,
  709. }
  710. if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  711. request.Version = 2
  712. } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  713. request.Version = 1
  714. }
  715. return coordinator.FetchOffset(request)
  716. }
  717. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  718. coordinator, err := ca.client.Coordinator(group)
  719. if err != nil {
  720. return err
  721. }
  722. request := &DeleteGroupsRequest{
  723. Groups: []string{group},
  724. }
  725. resp, err := coordinator.DeleteGroups(request)
  726. if err != nil {
  727. return err
  728. }
  729. groupErr, ok := resp.GroupErrorCodes[group]
  730. if !ok {
  731. return ErrIncompleteResponse
  732. }
  733. if groupErr != ErrNoError {
  734. return groupErr
  735. }
  736. return nil
  737. }
  738. func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
  739. allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
  740. // Query brokers in parallel, since we may have to query multiple brokers
  741. logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
  742. errors := make(chan error, len(brokerIds))
  743. wg := sync.WaitGroup{}
  744. for _, b := range brokerIds {
  745. wg.Add(1)
  746. broker, err := ca.findBroker(b)
  747. if err != nil {
  748. Logger.Printf("Unable to find broker with ID = %v\n", b)
  749. continue
  750. }
  751. go func(b *Broker, conf *Config) {
  752. defer wg.Done()
  753. _ = b.Open(conf) // Ensure that broker is opened
  754. response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
  755. if err != nil {
  756. errors <- err
  757. return
  758. }
  759. logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
  760. logDirs[b.ID()] = response.LogDirs
  761. logDirsMaps <- logDirs
  762. }(broker, ca.conf)
  763. }
  764. wg.Wait()
  765. close(logDirsMaps)
  766. close(errors)
  767. for logDirsMap := range logDirsMaps {
  768. for id, logDirs := range logDirsMap {
  769. allLogDirs[id] = logDirs
  770. }
  771. }
  772. // Intentionally return only the first error for simplicity
  773. err = <-errors
  774. return
  775. }