admin.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. package sarama
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. )
  7. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  8. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  9. // Methods with stricter requirements will specify the minimum broker version required.
  10. // You MUST call Close() on a client to avoid leaks
  11. type ClusterAdmin interface {
  12. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  13. // It may take several seconds after CreateTopic returns success for all the brokers
  14. // to become aware that the topic has been created. During this time, listTopics
  15. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  16. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  17. // List the topics available in the cluster with the default options.
  18. ListTopics() (map[string]TopicDetail, error)
  19. // Describe some topics in the cluster.
  20. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  21. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  22. // and for all the brokers to become aware that the topics are gone.
  23. // During this time, listTopics may continue to return information about the deleted topic.
  24. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  25. // the topic for deletion, but not actually delete them.
  26. // This operation is supported by brokers with version 0.10.1.0 or higher.
  27. DeleteTopic(topic string) error
  28. // Increase the number of partitions of the topics according to the corresponding values.
  29. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  30. // the messages will be affected. It may take several seconds after this method returns
  31. // success for all the brokers to become aware that the partitions have been created.
  32. // During this time, ClusterAdmin#describeTopics may not return information about the
  33. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  34. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  35. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  36. // This operation is supported by brokers with version 0.11.0.0 or higher.
  37. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  38. // Get the configuration for the specified resources.
  39. // The returned configuration includes default values and the Default is true
  40. // can be used to distinguish them from user supplied values.
  41. // Config entries where ReadOnly is true cannot be updated.
  42. // The value of config entries where Sensitive is true is always nil so
  43. // sensitive information is not disclosed.
  44. // This operation is supported by brokers with version 0.11.0.0 or higher.
  45. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  46. // Update the configuration for the specified resources with the default options.
  47. // This operation is supported by brokers with version 0.11.0.0 or higher.
  48. // The resources with their configs (topic is the only resource type with configs
  49. // that can be updated currently Updates are not transactional so they may succeed
  50. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  51. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  52. // Creates access control lists (ACLs) which are bound to specific resources.
  53. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  54. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  55. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  56. CreateACL(resource Resource, acl Acl) error
  57. // Lists access control lists (ACLs) according to the supplied filter.
  58. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  59. // This operation is supported by brokers with version 0.11.0.0 or higher.
  60. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  61. // Deletes access control lists (ACLs) according to the supplied filters.
  62. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  63. // This operation is supported by brokers with version 0.11.0.0 or higher.
  64. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  65. // List the consumer groups available in the cluster.
  66. ListConsumerGroups() (map[string]string, error)
  67. // Describe the given consumer groups.
  68. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  69. // List the consumer group offsets available in the cluster.
  70. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  71. // Delete a consumer group.
  72. DeleteConsumerGroup(group string) error
  73. // Get information about the nodes in the cluster
  74. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  75. // Close shuts down the admin and closes underlying client.
  76. Close() error
  77. }
  78. type clusterAdmin struct {
  79. client Client
  80. conf *Config
  81. }
  82. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  83. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  84. client, err := NewClient(addrs, conf)
  85. if err != nil {
  86. return nil, err
  87. }
  88. return NewClusterAdminFromClient(client)
  89. }
  90. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  91. // Note that underlying client will also be closed on admin's Close() call.
  92. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  93. //make sure we can retrieve the controller
  94. _, err := client.Controller()
  95. if err != nil {
  96. return nil, err
  97. }
  98. ca := &clusterAdmin{
  99. client: client,
  100. conf: client.Config(),
  101. }
  102. return ca, nil
  103. }
  104. func (ca *clusterAdmin) Close() error {
  105. return ca.client.Close()
  106. }
  107. func (ca *clusterAdmin) Controller() (*Broker, error) {
  108. return ca.client.Controller()
  109. }
  110. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  111. if topic == "" {
  112. return ErrInvalidTopic
  113. }
  114. if detail == nil {
  115. return errors.New("you must specify topic details")
  116. }
  117. topicDetails := make(map[string]*TopicDetail)
  118. topicDetails[topic] = detail
  119. request := &CreateTopicsRequest{
  120. TopicDetails: topicDetails,
  121. ValidateOnly: validateOnly,
  122. Timeout: ca.conf.Admin.Timeout,
  123. }
  124. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  125. request.Version = 1
  126. }
  127. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  128. request.Version = 2
  129. }
  130. b, err := ca.Controller()
  131. if err != nil {
  132. return err
  133. }
  134. rsp, err := b.CreateTopics(request)
  135. if err != nil {
  136. return err
  137. }
  138. topicErr, ok := rsp.TopicErrors[topic]
  139. if !ok {
  140. return ErrIncompleteResponse
  141. }
  142. if topicErr.Err != ErrNoError {
  143. return topicErr
  144. }
  145. return nil
  146. }
  147. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  148. controller, err := ca.Controller()
  149. if err != nil {
  150. return nil, err
  151. }
  152. request := &MetadataRequest{
  153. Topics: topics,
  154. AllowAutoTopicCreation: false,
  155. }
  156. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  157. request.Version = 5
  158. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  159. request.Version = 4
  160. }
  161. response, err := controller.GetMetadata(request)
  162. if err != nil {
  163. return nil, err
  164. }
  165. return response.Topics, nil
  166. }
  167. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  168. controller, err := ca.Controller()
  169. if err != nil {
  170. return nil, int32(0), err
  171. }
  172. request := &MetadataRequest{
  173. Topics: []string{},
  174. }
  175. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  176. request.Version = 1
  177. }
  178. response, err := controller.GetMetadata(request)
  179. if err != nil {
  180. return nil, int32(0), err
  181. }
  182. return response.Brokers, response.ControllerID, nil
  183. }
  184. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  185. brokers := ca.client.Brokers()
  186. if len(brokers) > 0 {
  187. index := rand.Intn(len(brokers))
  188. return brokers[index], nil
  189. }
  190. return nil, errors.New("no available broker")
  191. }
  192. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  193. // In order to build TopicDetails we need to first get the list of all
  194. // topics using a MetadataRequest and then get their configs using a
  195. // DescribeConfigsRequest request. To avoid sending many requests to the
  196. // broker, we use a single DescribeConfigsRequest.
  197. // Send the all-topic MetadataRequest
  198. b, err := ca.findAnyBroker()
  199. if err != nil {
  200. return nil, err
  201. }
  202. _ = b.Open(ca.client.Config())
  203. metadataReq := &MetadataRequest{}
  204. metadataResp, err := b.GetMetadata(metadataReq)
  205. if err != nil {
  206. return nil, err
  207. }
  208. topicsDetailsMap := make(map[string]TopicDetail)
  209. var describeConfigsResources []*ConfigResource
  210. for _, topic := range metadataResp.Topics {
  211. topicDetails := TopicDetail{
  212. NumPartitions: int32(len(topic.Partitions)),
  213. }
  214. if len(topic.Partitions) > 0 {
  215. topicDetails.ReplicaAssignment = map[int32][]int32{}
  216. for _, partition := range topic.Partitions {
  217. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  218. }
  219. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  220. }
  221. topicsDetailsMap[topic.Name] = topicDetails
  222. // we populate the resources we want to describe from the MetadataResponse
  223. topicResource := ConfigResource{
  224. Type: TopicResource,
  225. Name: topic.Name,
  226. }
  227. describeConfigsResources = append(describeConfigsResources, &topicResource)
  228. }
  229. // Send the DescribeConfigsRequest
  230. describeConfigsReq := &DescribeConfigsRequest{
  231. Resources: describeConfigsResources,
  232. }
  233. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  234. if err != nil {
  235. return nil, err
  236. }
  237. for _, resource := range describeConfigsResp.Resources {
  238. topicDetails := topicsDetailsMap[resource.Name]
  239. topicDetails.ConfigEntries = make(map[string]*string)
  240. for _, entry := range resource.Configs {
  241. // only include non-default non-sensitive config
  242. // (don't actually think topic config will ever be sensitive)
  243. if entry.Default || entry.Sensitive {
  244. continue
  245. }
  246. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  247. }
  248. topicsDetailsMap[resource.Name] = topicDetails
  249. }
  250. return topicsDetailsMap, nil
  251. }
  252. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  253. if topic == "" {
  254. return ErrInvalidTopic
  255. }
  256. request := &DeleteTopicsRequest{
  257. Topics: []string{topic},
  258. Timeout: ca.conf.Admin.Timeout,
  259. }
  260. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  261. request.Version = 1
  262. }
  263. b, err := ca.Controller()
  264. if err != nil {
  265. return err
  266. }
  267. rsp, err := b.DeleteTopics(request)
  268. if err != nil {
  269. return err
  270. }
  271. topicErr, ok := rsp.TopicErrorCodes[topic]
  272. if !ok {
  273. return ErrIncompleteResponse
  274. }
  275. if topicErr != ErrNoError {
  276. return topicErr
  277. }
  278. return nil
  279. }
  280. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  281. if topic == "" {
  282. return ErrInvalidTopic
  283. }
  284. topicPartitions := make(map[string]*TopicPartition)
  285. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  286. request := &CreatePartitionsRequest{
  287. TopicPartitions: topicPartitions,
  288. Timeout: ca.conf.Admin.Timeout,
  289. }
  290. b, err := ca.Controller()
  291. if err != nil {
  292. return err
  293. }
  294. rsp, err := b.CreatePartitions(request)
  295. if err != nil {
  296. return err
  297. }
  298. topicErr, ok := rsp.TopicPartitionErrors[topic]
  299. if !ok {
  300. return ErrIncompleteResponse
  301. }
  302. if topicErr.Err != ErrNoError {
  303. return topicErr
  304. }
  305. return nil
  306. }
  307. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  308. if topic == "" {
  309. return ErrInvalidTopic
  310. }
  311. partitionPerBroker := make(map[*Broker][]int32)
  312. for partition := range partitionOffsets {
  313. broker, err := ca.client.Leader(topic, partition)
  314. if err != nil {
  315. return err
  316. }
  317. if _, ok := partitionPerBroker[broker]; ok {
  318. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  319. } else {
  320. partitionPerBroker[broker] = []int32{partition}
  321. }
  322. }
  323. errs := make([]error, 0)
  324. for broker, partitions := range partitionPerBroker {
  325. topics := make(map[string]*DeleteRecordsRequestTopic)
  326. recordsToDelete := make(map[int32]int64)
  327. for _, p := range partitions {
  328. recordsToDelete[p] = partitionOffsets[p]
  329. }
  330. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
  331. request := &DeleteRecordsRequest{
  332. Topics: topics,
  333. Timeout: ca.conf.Admin.Timeout,
  334. }
  335. rsp, err := broker.DeleteRecords(request)
  336. if err != nil {
  337. errs = append(errs, err)
  338. } else {
  339. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  340. if !ok {
  341. errs = append(errs, ErrIncompleteResponse)
  342. } else {
  343. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  344. if deleteRecordsResponsePartition.Err != ErrNoError {
  345. errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
  346. }
  347. }
  348. }
  349. }
  350. }
  351. if len(errs) > 0 {
  352. return ErrDeleteRecords{MultiError{&errs}}
  353. }
  354. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  355. //for each partition instead of one error
  356. return nil
  357. }
  358. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  359. var entries []ConfigEntry
  360. var resources []*ConfigResource
  361. resources = append(resources, &resource)
  362. request := &DescribeConfigsRequest{
  363. Resources: resources,
  364. }
  365. b, err := ca.Controller()
  366. if err != nil {
  367. return nil, err
  368. }
  369. rsp, err := b.DescribeConfigs(request)
  370. if err != nil {
  371. return nil, err
  372. }
  373. for _, rspResource := range rsp.Resources {
  374. if rspResource.Name == resource.Name {
  375. if rspResource.ErrorMsg != "" {
  376. return nil, errors.New(rspResource.ErrorMsg)
  377. }
  378. for _, cfgEntry := range rspResource.Configs {
  379. entries = append(entries, *cfgEntry)
  380. }
  381. }
  382. }
  383. return entries, nil
  384. }
  385. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  386. var resources []*AlterConfigsResource
  387. resources = append(resources, &AlterConfigsResource{
  388. Type: resourceType,
  389. Name: name,
  390. ConfigEntries: entries,
  391. })
  392. request := &AlterConfigsRequest{
  393. Resources: resources,
  394. ValidateOnly: validateOnly,
  395. }
  396. b, err := ca.Controller()
  397. if err != nil {
  398. return err
  399. }
  400. rsp, err := b.AlterConfigs(request)
  401. if err != nil {
  402. return err
  403. }
  404. for _, rspResource := range rsp.Resources {
  405. if rspResource.Name == name {
  406. if rspResource.ErrorMsg != "" {
  407. return errors.New(rspResource.ErrorMsg)
  408. }
  409. }
  410. }
  411. return nil
  412. }
  413. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  414. var acls []*AclCreation
  415. acls = append(acls, &AclCreation{resource, acl})
  416. request := &CreateAclsRequest{AclCreations: acls}
  417. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  418. request.Version = 1
  419. }
  420. b, err := ca.Controller()
  421. if err != nil {
  422. return err
  423. }
  424. _, err = b.CreateAcls(request)
  425. return err
  426. }
  427. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  428. request := &DescribeAclsRequest{AclFilter: filter}
  429. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  430. request.Version = 1
  431. }
  432. b, err := ca.Controller()
  433. if err != nil {
  434. return nil, err
  435. }
  436. rsp, err := b.DescribeAcls(request)
  437. if err != nil {
  438. return nil, err
  439. }
  440. var lAcls []ResourceAcls
  441. for _, rAcl := range rsp.ResourceAcls {
  442. lAcls = append(lAcls, *rAcl)
  443. }
  444. return lAcls, nil
  445. }
  446. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  447. var filters []*AclFilter
  448. filters = append(filters, &filter)
  449. request := &DeleteAclsRequest{Filters: filters}
  450. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  451. request.Version = 1
  452. }
  453. b, err := ca.Controller()
  454. if err != nil {
  455. return nil, err
  456. }
  457. rsp, err := b.DeleteAcls(request)
  458. if err != nil {
  459. return nil, err
  460. }
  461. var mAcls []MatchingAcl
  462. for _, fr := range rsp.FilterResponses {
  463. for _, mACL := range fr.MatchingAcls {
  464. mAcls = append(mAcls, *mACL)
  465. }
  466. }
  467. return mAcls, nil
  468. }
  469. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  470. groupsPerBroker := make(map[*Broker][]string)
  471. for _, group := range groups {
  472. controller, err := ca.client.Coordinator(group)
  473. if err != nil {
  474. return nil, err
  475. }
  476. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  477. }
  478. for broker, brokerGroups := range groupsPerBroker {
  479. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  480. Groups: brokerGroups,
  481. })
  482. if err != nil {
  483. return nil, err
  484. }
  485. result = append(result, response.Groups...)
  486. }
  487. return result, nil
  488. }
  489. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  490. allGroups = make(map[string]string)
  491. // Query brokers in parallel, since we have to query *all* brokers
  492. brokers := ca.client.Brokers()
  493. groupMaps := make(chan map[string]string, len(brokers))
  494. errors := make(chan error, len(brokers))
  495. wg := sync.WaitGroup{}
  496. for _, b := range brokers {
  497. wg.Add(1)
  498. go func(b *Broker, conf *Config) {
  499. defer wg.Done()
  500. _ = b.Open(conf) // Ensure that broker is opened
  501. response, err := b.ListGroups(&ListGroupsRequest{})
  502. if err != nil {
  503. errors <- err
  504. return
  505. }
  506. groups := make(map[string]string)
  507. for group, typ := range response.Groups {
  508. groups[group] = typ
  509. }
  510. groupMaps <- groups
  511. }(b, ca.conf)
  512. }
  513. wg.Wait()
  514. close(groupMaps)
  515. close(errors)
  516. for groupMap := range groupMaps {
  517. for group, protocolType := range groupMap {
  518. allGroups[group] = protocolType
  519. }
  520. }
  521. // Intentionally return only the first error for simplicity
  522. err = <-errors
  523. return
  524. }
  525. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  526. coordinator, err := ca.client.Coordinator(group)
  527. if err != nil {
  528. return nil, err
  529. }
  530. request := &OffsetFetchRequest{
  531. ConsumerGroup: group,
  532. partitions: topicPartitions,
  533. }
  534. if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  535. request.Version = 2
  536. } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  537. request.Version = 1
  538. }
  539. return coordinator.FetchOffset(request)
  540. }
  541. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  542. coordinator, err := ca.client.Coordinator(group)
  543. if err != nil {
  544. return err
  545. }
  546. request := &DeleteGroupsRequest{
  547. Groups: []string{group},
  548. }
  549. resp, err := coordinator.DeleteGroups(request)
  550. if err != nil {
  551. return err
  552. }
  553. groupErr, ok := resp.GroupErrorCodes[group]
  554. if !ok {
  555. return ErrIncompleteResponse
  556. }
  557. if groupErr != ErrNoError {
  558. return groupErr
  559. }
  560. return nil
  561. }