admin.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. package sarama
  2. import "errors"
  3. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  4. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  5. // Methods with stricter requirements will specify the minimum broker version required.
  6. // You MUST call Close() on a client to avoid leaks
  7. type ClusterAdmin interface {
  8. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  9. // It may take several seconds after CreateTopic returns success for all the brokers
  10. // to become aware that the topic has been created. During this time, listTopics
  11. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  12. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  13. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  14. // and for all the brokers to become aware that the topics are gone.
  15. // During this time, listTopics may continue to return information about the deleted topic.
  16. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  17. // the topic for deletion, but not actually delete them.
  18. // This operation is supported by brokers with version 0.10.1.0 or higher.
  19. DeleteTopic(topic string) error
  20. // Increase the number of partitions of the topics according to the corresponding values.
  21. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  22. // the messages will be affected. It may take several seconds after this method returns
  23. // success for all the brokers to become aware that the partitions have been created.
  24. // During this time, ClusterAdmin#describeTopics may not return information about the
  25. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  26. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  27. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  28. // This operation is supported by brokers with version 0.11.0.0 or higher.
  29. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  30. // Get the configuration for the specified resources.
  31. // The returned configuration includes default values and the Default is true
  32. // can be used to distinguish them from user supplied values.
  33. // Config entries where ReadOnly is true cannot be updated.
  34. // The value of config entries where Sensitive is true is always nil so
  35. // sensitive information is not disclosed.
  36. // This operation is supported by brokers with version 0.11.0.0 or higher.
  37. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  38. // Update the configuration for the specified resources with the default options.
  39. // This operation is supported by brokers with version 0.11.0.0 or higher.
  40. // The resources with their configs (topic is the only resource type with configs
  41. // that can be updated currently Updates are not transactional so they may succeed
  42. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  43. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  44. // Creates access control lists (ACLs) which are bound to specific resources.
  45. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  46. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  47. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  48. CreateACL(resource Resource, acl Acl) error
  49. // Lists access control lists (ACLs) according to the supplied filter.
  50. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  51. // This operation is supported by brokers with version 0.11.0.0 or higher.
  52. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  53. // Deletes access control lists (ACLs) according to the supplied filters.
  54. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  55. // This operation is supported by brokers with version 0.11.0.0 or higher.
  56. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  57. // Close shuts down the admin and closes underlying client.
  58. Close() error
  59. }
  60. type clusterAdmin struct {
  61. client Client
  62. conf *Config
  63. }
  64. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  65. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  66. client, err := NewClient(addrs, conf)
  67. if err != nil {
  68. return nil, err
  69. }
  70. //make sure we can retrieve the controller
  71. _, err = client.Controller()
  72. if err != nil {
  73. return nil, err
  74. }
  75. ca := &clusterAdmin{
  76. client: client,
  77. conf: client.Config(),
  78. }
  79. return ca, nil
  80. }
  81. func (ca *clusterAdmin) Close() error {
  82. return ca.client.Close()
  83. }
  84. func (ca *clusterAdmin) Controller() (*Broker, error) {
  85. return ca.client.Controller()
  86. }
  87. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  88. if topic == "" {
  89. return ErrInvalidTopic
  90. }
  91. if detail == nil {
  92. return errors.New("You must specify topic details")
  93. }
  94. topicDetails := make(map[string]*TopicDetail)
  95. topicDetails[topic] = detail
  96. request := &CreateTopicsRequest{
  97. TopicDetails: topicDetails,
  98. ValidateOnly: validateOnly,
  99. Timeout: ca.conf.Admin.Timeout,
  100. }
  101. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  102. request.Version = 1
  103. }
  104. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  105. request.Version = 2
  106. }
  107. b, err := ca.Controller()
  108. if err != nil {
  109. return err
  110. }
  111. rsp, err := b.CreateTopics(request)
  112. if err != nil {
  113. return err
  114. }
  115. topicErr, ok := rsp.TopicErrors[topic]
  116. if !ok {
  117. return ErrIncompleteResponse
  118. }
  119. if topicErr.Err != ErrNoError {
  120. return topicErr.Err
  121. }
  122. return nil
  123. }
  124. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  125. if topic == "" {
  126. return ErrInvalidTopic
  127. }
  128. request := &DeleteTopicsRequest{
  129. Topics: []string{topic},
  130. Timeout: ca.conf.Admin.Timeout,
  131. }
  132. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  133. request.Version = 1
  134. }
  135. b, err := ca.Controller()
  136. if err != nil {
  137. return err
  138. }
  139. rsp, err := b.DeleteTopics(request)
  140. if err != nil {
  141. return err
  142. }
  143. topicErr, ok := rsp.TopicErrorCodes[topic]
  144. if !ok {
  145. return ErrIncompleteResponse
  146. }
  147. if topicErr != ErrNoError {
  148. return topicErr
  149. }
  150. return nil
  151. }
  152. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  153. if topic == "" {
  154. return ErrInvalidTopic
  155. }
  156. topicPartitions := make(map[string]*TopicPartition)
  157. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  158. request := &CreatePartitionsRequest{
  159. TopicPartitions: topicPartitions,
  160. Timeout: ca.conf.Admin.Timeout,
  161. }
  162. b, err := ca.Controller()
  163. if err != nil {
  164. return err
  165. }
  166. rsp, err := b.CreatePartitions(request)
  167. if err != nil {
  168. return err
  169. }
  170. topicErr, ok := rsp.TopicPartitionErrors[topic]
  171. if !ok {
  172. return ErrIncompleteResponse
  173. }
  174. if topicErr.Err != ErrNoError {
  175. return topicErr.Err
  176. }
  177. return nil
  178. }
  179. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  180. if topic == "" {
  181. return ErrInvalidTopic
  182. }
  183. topics := make(map[string]*DeleteRecordsRequestTopic)
  184. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
  185. request := &DeleteRecordsRequest{
  186. Topics: topics,
  187. Timeout: ca.conf.Admin.Timeout,
  188. }
  189. b, err := ca.Controller()
  190. if err != nil {
  191. return err
  192. }
  193. rsp, err := b.DeleteRecords(request)
  194. if err != nil {
  195. return err
  196. }
  197. _, ok := rsp.Topics[topic]
  198. if !ok {
  199. return ErrIncompleteResponse
  200. }
  201. //todo since we are dealing with couple of partitions it would be good if we return slice of errors
  202. //for each partition instead of one error
  203. return nil
  204. }
  205. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  206. var entries []ConfigEntry
  207. var resources []*ConfigResource
  208. resources = append(resources, &resource)
  209. request := &DescribeConfigsRequest{
  210. Resources: resources,
  211. }
  212. b, err := ca.Controller()
  213. if err != nil {
  214. return nil, err
  215. }
  216. rsp, err := b.DescribeConfigs(request)
  217. if err != nil {
  218. return nil, err
  219. }
  220. for _, rspResource := range rsp.Resources {
  221. if rspResource.Name == resource.Name {
  222. if rspResource.ErrorMsg != "" {
  223. return nil, errors.New(rspResource.ErrorMsg)
  224. }
  225. for _, cfgEntry := range rspResource.Configs {
  226. entries = append(entries, *cfgEntry)
  227. }
  228. }
  229. }
  230. return entries, nil
  231. }
  232. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  233. var resources []*AlterConfigsResource
  234. resources = append(resources, &AlterConfigsResource{
  235. Type: resourceType,
  236. Name: name,
  237. ConfigEntries: entries,
  238. })
  239. request := &AlterConfigsRequest{
  240. Resources: resources,
  241. ValidateOnly: validateOnly,
  242. }
  243. b, err := ca.Controller()
  244. if err != nil {
  245. return err
  246. }
  247. rsp, err := b.AlterConfigs(request)
  248. if err != nil {
  249. return err
  250. }
  251. for _, rspResource := range rsp.Resources {
  252. if rspResource.Name == name {
  253. if rspResource.ErrorMsg != "" {
  254. return errors.New(rspResource.ErrorMsg)
  255. }
  256. }
  257. }
  258. return nil
  259. }
  260. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  261. var acls []*AclCreation
  262. acls = append(acls, &AclCreation{resource, acl})
  263. request := &CreateAclsRequest{AclCreations: acls}
  264. b, err := ca.Controller()
  265. if err != nil {
  266. return err
  267. }
  268. _, err = b.CreateAcls(request)
  269. return err
  270. }
  271. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  272. request := &DescribeAclsRequest{AclFilter: filter}
  273. b, err := ca.Controller()
  274. if err != nil {
  275. return nil, err
  276. }
  277. rsp, err := b.DescribeAcls(request)
  278. if err != nil {
  279. return nil, err
  280. }
  281. var lAcls []ResourceAcls
  282. for _, rAcl := range rsp.ResourceAcls {
  283. lAcls = append(lAcls, *rAcl)
  284. }
  285. return lAcls, nil
  286. }
  287. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  288. var filters []*AclFilter
  289. filters = append(filters, &filter)
  290. request := &DeleteAclsRequest{Filters: filters}
  291. b, err := ca.Controller()
  292. if err != nil {
  293. return nil, err
  294. }
  295. rsp, err := b.DeleteAcls(request)
  296. if err != nil {
  297. return nil, err
  298. }
  299. var mAcls []MatchingAcl
  300. for _, fr := range rsp.FilterResponses {
  301. for _, mACL := range fr.MatchingAcls {
  302. mAcls = append(mAcls, *mACL)
  303. }
  304. }
  305. return mAcls, nil
  306. }