admin.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. package sarama
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. )
  7. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  8. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  9. // Methods with stricter requirements will specify the minimum broker version required.
  10. // You MUST call Close() on a client to avoid leaks
  11. type ClusterAdmin interface {
  12. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  13. // It may take several seconds after CreateTopic returns success for all the brokers
  14. // to become aware that the topic has been created. During this time, listTopics
  15. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  16. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  17. // List the topics available in the cluster with the default options.
  18. ListTopics() (map[string]TopicDetail, error)
  19. // Describe some topics in the cluster
  20. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  21. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  22. // and for all the brokers to become aware that the topics are gone.
  23. // During this time, listTopics may continue to return information about the deleted topic.
  24. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  25. // the topic for deletion, but not actually delete them.
  26. // This operation is supported by brokers with version 0.10.1.0 or higher.
  27. DeleteTopic(topic string) error
  28. // Increase the number of partitions of the topics according to the corresponding values.
  29. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  30. // the messages will be affected. It may take several seconds after this method returns
  31. // success for all the brokers to become aware that the partitions have been created.
  32. // During this time, ClusterAdmin#describeTopics may not return information about the
  33. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  34. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  35. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  36. // This operation is supported by brokers with version 0.11.0.0 or higher.
  37. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  38. // Get the configuration for the specified resources.
  39. // The returned configuration includes default values and the Default is true
  40. // can be used to distinguish them from user supplied values.
  41. // Config entries where ReadOnly is true cannot be updated.
  42. // The value of config entries where Sensitive is true is always nil so
  43. // sensitive information is not disclosed.
  44. // This operation is supported by brokers with version 0.11.0.0 or higher.
  45. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  46. // Update the configuration for the specified resources with the default options.
  47. // This operation is supported by brokers with version 0.11.0.0 or higher.
  48. // The resources with their configs (topic is the only resource type with configs
  49. // that can be updated currently Updates are not transactional so they may succeed
  50. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  51. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  52. // Creates access control lists (ACLs) which are bound to specific resources.
  53. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  54. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  55. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  56. CreateACL(resource Resource, acl Acl) error
  57. // Lists access control lists (ACLs) according to the supplied filter.
  58. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  59. // This operation is supported by brokers with version 0.11.0.0 or higher.
  60. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  61. // Deletes access control lists (ACLs) according to the supplied filters.
  62. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  63. // This operation is supported by brokers with version 0.11.0.0 or higher.
  64. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  65. // List the consumer groups available in the cluster.
  66. ListConsumerGroups() (map[string]string, error)
  67. // Describe the given consumer group
  68. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  69. // List the consumer group offsets available in the cluster.
  70. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  71. // Get information about the nodes in the cluster
  72. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  73. // Close shuts down the admin and closes underlying client.
  74. Close() error
  75. }
  76. type clusterAdmin struct {
  77. client Client
  78. conf *Config
  79. }
  80. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  81. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  82. client, err := NewClient(addrs, conf)
  83. if err != nil {
  84. return nil, err
  85. }
  86. //make sure we can retrieve the controller
  87. _, err = client.Controller()
  88. if err != nil {
  89. return nil, err
  90. }
  91. ca := &clusterAdmin{
  92. client: client,
  93. conf: client.Config(),
  94. }
  95. return ca, nil
  96. }
  97. func (ca *clusterAdmin) Close() error {
  98. return ca.client.Close()
  99. }
  100. func (ca *clusterAdmin) Controller() (*Broker, error) {
  101. return ca.client.Controller()
  102. }
  103. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  104. if topic == "" {
  105. return ErrInvalidTopic
  106. }
  107. if detail == nil {
  108. return errors.New("You must specify topic details")
  109. }
  110. topicDetails := make(map[string]*TopicDetail)
  111. topicDetails[topic] = detail
  112. request := &CreateTopicsRequest{
  113. TopicDetails: topicDetails,
  114. ValidateOnly: validateOnly,
  115. Timeout: ca.conf.Admin.Timeout,
  116. }
  117. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  118. request.Version = 1
  119. }
  120. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  121. request.Version = 2
  122. }
  123. b, err := ca.Controller()
  124. if err != nil {
  125. return err
  126. }
  127. rsp, err := b.CreateTopics(request)
  128. if err != nil {
  129. return err
  130. }
  131. topicErr, ok := rsp.TopicErrors[topic]
  132. if !ok {
  133. return ErrIncompleteResponse
  134. }
  135. if topicErr.Err != ErrNoError {
  136. return topicErr.Err
  137. }
  138. return nil
  139. }
  140. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  141. controller, err := ca.Controller()
  142. if err != nil {
  143. return nil, err
  144. }
  145. request := &MetadataRequest{
  146. Topics: topics,
  147. AllowAutoTopicCreation: false,
  148. }
  149. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  150. request.Version = 4
  151. }
  152. response, err := controller.GetMetadata(request)
  153. if err != nil {
  154. return nil, err
  155. }
  156. return response.Topics, nil
  157. }
  158. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  159. controller, err := ca.Controller()
  160. if err != nil {
  161. return nil, int32(0), err
  162. }
  163. request := &MetadataRequest{
  164. Topics: []string{},
  165. }
  166. response, err := controller.GetMetadata(request)
  167. if err != nil {
  168. return nil, int32(0), err
  169. }
  170. return response.Brokers, response.ControllerID, nil
  171. }
  172. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  173. brokers := ca.client.Brokers()
  174. if len(brokers) > 0 {
  175. index := rand.Intn(len(brokers))
  176. return brokers[index], nil
  177. }
  178. return nil, errors.New("no available broker")
  179. }
  180. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  181. // In order to build TopicDetails we need to first get the list of all
  182. // topics using a MetadataRequest and then get their configs using a
  183. // DescribeConfigsRequest request. To avoid sending many requests to the
  184. // broker, we use a single DescribeConfigsRequest.
  185. // Send the all-topic MetadataRequest
  186. b, err := ca.findAnyBroker()
  187. if err != nil {
  188. return nil, err
  189. }
  190. _ = b.Open(ca.client.Config())
  191. metadataReq := &MetadataRequest{}
  192. metadataResp, err := b.GetMetadata(metadataReq)
  193. if err != nil {
  194. return nil, err
  195. }
  196. topicsDetailsMap := make(map[string]TopicDetail)
  197. var describeConfigsResources []*ConfigResource
  198. for _, topic := range metadataResp.Topics {
  199. topicDetails := TopicDetail{
  200. NumPartitions: int32(len(topic.Partitions)),
  201. }
  202. if len(topic.Partitions) > 0 {
  203. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  204. }
  205. topicsDetailsMap[topic.Name] = topicDetails
  206. // we populate the resources we want to describe from the MetadataResponse
  207. topicResource := ConfigResource{
  208. Type: TopicResource,
  209. Name: topic.Name,
  210. }
  211. describeConfigsResources = append(describeConfigsResources, &topicResource)
  212. }
  213. // Send the DescribeConfigsRequest
  214. describeConfigsReq := &DescribeConfigsRequest{
  215. Resources: describeConfigsResources,
  216. }
  217. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  218. if err != nil {
  219. return nil, err
  220. }
  221. for _, resource := range describeConfigsResp.Resources {
  222. topicDetails := topicsDetailsMap[resource.Name]
  223. topicDetails.ConfigEntries = make(map[string]*string)
  224. for _, entry := range resource.Configs {
  225. // only include non-default non-sensitive config
  226. // (don't actually think topic config will ever be sensitive)
  227. if entry.Default || entry.Sensitive {
  228. continue
  229. }
  230. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  231. }
  232. topicsDetailsMap[resource.Name] = topicDetails
  233. }
  234. return topicsDetailsMap, nil
  235. }
  236. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  237. if topic == "" {
  238. return ErrInvalidTopic
  239. }
  240. request := &DeleteTopicsRequest{
  241. Topics: []string{topic},
  242. Timeout: ca.conf.Admin.Timeout,
  243. }
  244. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  245. request.Version = 1
  246. }
  247. b, err := ca.Controller()
  248. if err != nil {
  249. return err
  250. }
  251. rsp, err := b.DeleteTopics(request)
  252. if err != nil {
  253. return err
  254. }
  255. topicErr, ok := rsp.TopicErrorCodes[topic]
  256. if !ok {
  257. return ErrIncompleteResponse
  258. }
  259. if topicErr != ErrNoError {
  260. return topicErr
  261. }
  262. return nil
  263. }
  264. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  265. if topic == "" {
  266. return ErrInvalidTopic
  267. }
  268. topicPartitions := make(map[string]*TopicPartition)
  269. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  270. request := &CreatePartitionsRequest{
  271. TopicPartitions: topicPartitions,
  272. Timeout: ca.conf.Admin.Timeout,
  273. }
  274. b, err := ca.Controller()
  275. if err != nil {
  276. return err
  277. }
  278. rsp, err := b.CreatePartitions(request)
  279. if err != nil {
  280. return err
  281. }
  282. topicErr, ok := rsp.TopicPartitionErrors[topic]
  283. if !ok {
  284. return ErrIncompleteResponse
  285. }
  286. if topicErr.Err != ErrNoError {
  287. return topicErr.Err
  288. }
  289. return nil
  290. }
  291. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  292. if topic == "" {
  293. return ErrInvalidTopic
  294. }
  295. topics := make(map[string]*DeleteRecordsRequestTopic)
  296. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
  297. request := &DeleteRecordsRequest{
  298. Topics: topics,
  299. Timeout: ca.conf.Admin.Timeout,
  300. }
  301. b, err := ca.Controller()
  302. if err != nil {
  303. return err
  304. }
  305. rsp, err := b.DeleteRecords(request)
  306. if err != nil {
  307. return err
  308. }
  309. _, ok := rsp.Topics[topic]
  310. if !ok {
  311. return ErrIncompleteResponse
  312. }
  313. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  314. //for each partition instead of one error
  315. return nil
  316. }
  317. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  318. var entries []ConfigEntry
  319. var resources []*ConfigResource
  320. resources = append(resources, &resource)
  321. request := &DescribeConfigsRequest{
  322. Resources: resources,
  323. }
  324. b, err := ca.Controller()
  325. if err != nil {
  326. return nil, err
  327. }
  328. rsp, err := b.DescribeConfigs(request)
  329. if err != nil {
  330. return nil, err
  331. }
  332. for _, rspResource := range rsp.Resources {
  333. if rspResource.Name == resource.Name {
  334. if rspResource.ErrorMsg != "" {
  335. return nil, errors.New(rspResource.ErrorMsg)
  336. }
  337. for _, cfgEntry := range rspResource.Configs {
  338. entries = append(entries, *cfgEntry)
  339. }
  340. }
  341. }
  342. return entries, nil
  343. }
  344. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  345. var resources []*AlterConfigsResource
  346. resources = append(resources, &AlterConfigsResource{
  347. Type: resourceType,
  348. Name: name,
  349. ConfigEntries: entries,
  350. })
  351. request := &AlterConfigsRequest{
  352. Resources: resources,
  353. ValidateOnly: validateOnly,
  354. }
  355. b, err := ca.Controller()
  356. if err != nil {
  357. return err
  358. }
  359. rsp, err := b.AlterConfigs(request)
  360. if err != nil {
  361. return err
  362. }
  363. for _, rspResource := range rsp.Resources {
  364. if rspResource.Name == name {
  365. if rspResource.ErrorMsg != "" {
  366. return errors.New(rspResource.ErrorMsg)
  367. }
  368. }
  369. }
  370. return nil
  371. }
  372. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  373. var acls []*AclCreation
  374. acls = append(acls, &AclCreation{resource, acl})
  375. request := &CreateAclsRequest{AclCreations: acls}
  376. b, err := ca.Controller()
  377. if err != nil {
  378. return err
  379. }
  380. _, err = b.CreateAcls(request)
  381. return err
  382. }
  383. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  384. request := &DescribeAclsRequest{AclFilter: filter}
  385. b, err := ca.Controller()
  386. if err != nil {
  387. return nil, err
  388. }
  389. rsp, err := b.DescribeAcls(request)
  390. if err != nil {
  391. return nil, err
  392. }
  393. var lAcls []ResourceAcls
  394. for _, rAcl := range rsp.ResourceAcls {
  395. lAcls = append(lAcls, *rAcl)
  396. }
  397. return lAcls, nil
  398. }
  399. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  400. var filters []*AclFilter
  401. filters = append(filters, &filter)
  402. request := &DeleteAclsRequest{Filters: filters}
  403. b, err := ca.Controller()
  404. if err != nil {
  405. return nil, err
  406. }
  407. rsp, err := b.DeleteAcls(request)
  408. if err != nil {
  409. return nil, err
  410. }
  411. var mAcls []MatchingAcl
  412. for _, fr := range rsp.FilterResponses {
  413. for _, mACL := range fr.MatchingAcls {
  414. mAcls = append(mAcls, *mACL)
  415. }
  416. }
  417. return mAcls, nil
  418. }
  419. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  420. groupsPerBroker := make(map[*Broker][]string)
  421. for _, group := range groups {
  422. controller, err := ca.client.Coordinator(group)
  423. if err != nil {
  424. return nil, err
  425. }
  426. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  427. }
  428. for broker, brokerGroups := range groupsPerBroker {
  429. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  430. Groups: brokerGroups,
  431. })
  432. if err != nil {
  433. return nil, err
  434. }
  435. result = append(result, response.Groups...)
  436. }
  437. return result, nil
  438. }
  439. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  440. allGroups = make(map[string]string)
  441. // Query brokers in parallel, since we have to query *all* brokers
  442. brokers := ca.client.Brokers()
  443. groupMaps := make(chan map[string]string, len(brokers))
  444. errors := make(chan error, len(brokers))
  445. wg := sync.WaitGroup{}
  446. for _, b := range brokers {
  447. wg.Add(1)
  448. go func(b *Broker, conf *Config) {
  449. defer wg.Done()
  450. _ = b.Open(conf) // Ensure that broker is opened
  451. response, err := b.ListGroups(&ListGroupsRequest{})
  452. if err != nil {
  453. errors <- err
  454. return
  455. }
  456. groups := make(map[string]string)
  457. for group, typ := range response.Groups {
  458. groups[group] = typ
  459. }
  460. groupMaps <- groups
  461. }(b, ca.conf)
  462. }
  463. wg.Wait()
  464. close(groupMaps)
  465. close(errors)
  466. for groupMap := range groupMaps {
  467. for group, protocolType := range groupMap {
  468. allGroups[group] = protocolType
  469. }
  470. }
  471. // Intentionally return only the first error for simplicity
  472. err = <-errors
  473. return
  474. }
  475. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  476. coordinator, err := ca.client.Coordinator(group)
  477. if err != nil {
  478. return nil, err
  479. }
  480. request := &OffsetFetchRequest{
  481. ConsumerGroup: group,
  482. partitions: topicPartitions,
  483. }
  484. if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  485. request.Version = 1
  486. }
  487. return coordinator.FetchOffset(request)
  488. }