admin_test.go 27 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "strings"
  9. "testing"
  10. )
  11. func TestClusterAdmin(t *testing.T) {
  12. seedBroker := NewMockBroker(t, 1)
  13. defer seedBroker.Close()
  14. seedBroker.SetHandlerByMap(map[string]MockResponse{
  15. "MetadataRequest": NewMockMetadataResponse(t).
  16. SetController(seedBroker.BrokerID()).
  17. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  18. })
  19. config := NewConfig()
  20. config.Version = V1_0_0_0
  21. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. err = admin.Close()
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. }
  30. func TestClusterAdminInvalidController(t *testing.T) {
  31. seedBroker := NewMockBroker(t, 1)
  32. defer seedBroker.Close()
  33. seedBroker.SetHandlerByMap(map[string]MockResponse{
  34. "MetadataRequest": NewMockMetadataResponse(t).
  35. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  36. })
  37. config := NewConfig()
  38. config.Version = V1_0_0_0
  39. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  40. if err == nil {
  41. t.Fatal(errors.New("controller not set still cluster admin was created"))
  42. }
  43. if err != ErrControllerNotAvailable {
  44. t.Fatal(err)
  45. }
  46. }
  47. func TestClusterAdminCreateTopic(t *testing.T) {
  48. seedBroker := NewMockBroker(t, 1)
  49. defer seedBroker.Close()
  50. seedBroker.SetHandlerByMap(map[string]MockResponse{
  51. "MetadataRequest": NewMockMetadataResponse(t).
  52. SetController(seedBroker.BrokerID()).
  53. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  54. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  55. })
  56. config := NewConfig()
  57. config.Version = V0_10_2_0
  58. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. err = admin.Close()
  67. if err != nil {
  68. t.Fatal(err)
  69. }
  70. }
  71. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  72. seedBroker := NewMockBroker(t, 1)
  73. defer seedBroker.Close()
  74. seedBroker.SetHandlerByMap(map[string]MockResponse{
  75. "MetadataRequest": NewMockMetadataResponse(t).
  76. SetController(seedBroker.BrokerID()).
  77. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  78. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  79. })
  80. config := NewConfig()
  81. config.Version = V0_10_2_0
  82. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. err = admin.CreateTopic("my_topic", nil, false)
  87. if err.Error() != "you must specify topic details" {
  88. t.Fatal(err)
  89. }
  90. err = admin.Close()
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. }
  95. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  96. seedBroker := NewMockBroker(t, 1)
  97. defer seedBroker.Close()
  98. seedBroker.SetHandlerByMap(map[string]MockResponse{
  99. "MetadataRequest": NewMockMetadataResponse(t).
  100. SetController(seedBroker.BrokerID()).
  101. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  102. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  103. })
  104. config := NewConfig()
  105. config.Version = V0_11_0_0
  106. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  111. want := "insufficient permissions to create topic with reserved prefix"
  112. if !strings.HasSuffix(err.Error(), want) {
  113. t.Fatal(err)
  114. }
  115. err = admin.Close()
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. }
  120. func TestClusterAdminListTopics(t *testing.T) {
  121. seedBroker := NewMockBroker(t, 1)
  122. defer seedBroker.Close()
  123. seedBroker.SetHandlerByMap(map[string]MockResponse{
  124. "MetadataRequest": NewMockMetadataResponse(t).
  125. SetController(seedBroker.BrokerID()).
  126. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  127. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  128. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  129. })
  130. config := NewConfig()
  131. config.Version = V1_0_0_0
  132. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. entries, err := admin.ListTopics()
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. if len(entries) <= 0 {
  141. t.Fatal(errors.New("no resource present"))
  142. }
  143. topic, found := entries["my_topic"]
  144. if !found {
  145. t.Fatal(errors.New("topic not found in response"))
  146. }
  147. _, found = topic.ConfigEntries["max.message.bytes"]
  148. if found {
  149. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  150. }
  151. value, _ := topic.ConfigEntries["retention.ms"]
  152. if value == nil || *value != "5000" {
  153. t.Fatal(errors.New("non-default topic config entry not found in response"))
  154. }
  155. err = admin.Close()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  160. t.Fatal(errors.New("replica assignment not found in response"))
  161. }
  162. }
  163. func TestClusterAdminDeleteTopic(t *testing.T) {
  164. seedBroker := NewMockBroker(t, 1)
  165. defer seedBroker.Close()
  166. seedBroker.SetHandlerByMap(map[string]MockResponse{
  167. "MetadataRequest": NewMockMetadataResponse(t).
  168. SetController(seedBroker.BrokerID()).
  169. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  170. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  171. })
  172. config := NewConfig()
  173. config.Version = V0_10_2_0
  174. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.DeleteTopic("my_topic")
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. err = admin.Close()
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. }
  187. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  188. seedBroker := NewMockBroker(t, 1)
  189. defer seedBroker.Close()
  190. seedBroker.SetHandlerByMap(map[string]MockResponse{
  191. "MetadataRequest": NewMockMetadataResponse(t).
  192. SetController(seedBroker.BrokerID()).
  193. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  194. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  195. })
  196. config := NewConfig()
  197. config.Version = V0_10_2_0
  198. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. err = admin.DeleteTopic("")
  203. if err != ErrInvalidTopic {
  204. t.Fatal(err)
  205. }
  206. err = admin.Close()
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. }
  211. func TestClusterAdminCreatePartitions(t *testing.T) {
  212. seedBroker := NewMockBroker(t, 1)
  213. defer seedBroker.Close()
  214. seedBroker.SetHandlerByMap(map[string]MockResponse{
  215. "MetadataRequest": NewMockMetadataResponse(t).
  216. SetController(seedBroker.BrokerID()).
  217. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  218. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  219. })
  220. config := NewConfig()
  221. config.Version = V1_0_0_0
  222. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.CreatePartitions("my_topic", 3, nil, false)
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. err = admin.Close()
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. }
  235. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  236. seedBroker := NewMockBroker(t, 1)
  237. defer seedBroker.Close()
  238. seedBroker.SetHandlerByMap(map[string]MockResponse{
  239. "MetadataRequest": NewMockMetadataResponse(t).
  240. SetController(seedBroker.BrokerID()).
  241. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  242. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  243. })
  244. config := NewConfig()
  245. config.Version = V0_10_2_0
  246. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. err = admin.CreatePartitions("my_topic", 3, nil, false)
  251. if err != ErrUnsupportedVersion {
  252. t.Fatal(err)
  253. }
  254. err = admin.Close()
  255. if err != nil {
  256. t.Fatal(err)
  257. }
  258. }
  259. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  260. seedBroker := NewMockBroker(t, 1)
  261. defer seedBroker.Close()
  262. seedBroker.SetHandlerByMap(map[string]MockResponse{
  263. "MetadataRequest": NewMockMetadataResponse(t).
  264. SetController(seedBroker.BrokerID()).
  265. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  266. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  267. })
  268. config := NewConfig()
  269. config.Version = V1_0_0_0
  270. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  271. if err != nil {
  272. t.Fatal(err)
  273. }
  274. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  275. want := "insufficient permissions to create partition on topic with reserved prefix"
  276. if !strings.HasSuffix(err.Error(), want) {
  277. t.Fatal(err)
  278. }
  279. err = admin.Close()
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. }
  284. func TestClusterAdminDeleteRecords(t *testing.T) {
  285. topicName := "my_topic"
  286. seedBroker := NewMockBroker(t, 1)
  287. defer seedBroker.Close()
  288. seedBroker.SetHandlerByMap(map[string]MockResponse{
  289. "MetadataRequest": NewMockMetadataResponse(t).
  290. SetController(seedBroker.BrokerID()).
  291. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  292. SetLeader(topicName, 1, 1).
  293. SetLeader(topicName, 2, 1).
  294. SetLeader(topicName, 3, 1),
  295. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  296. })
  297. config := NewConfig()
  298. config.Version = V1_0_0_0
  299. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. partitionOffsetFake := make(map[int32]int64)
  304. partitionOffsetFake[4] = 1000
  305. errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
  306. if errFake == nil {
  307. t.Fatal(err)
  308. }
  309. partitionOffset := make(map[int32]int64)
  310. partitionOffset[1] = 1000
  311. partitionOffset[2] = 1000
  312. partitionOffset[3] = 1000
  313. err = admin.DeleteRecords(topicName, partitionOffset)
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. err = admin.Close()
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. }
  322. func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
  323. topicName := "my_topic"
  324. seedBroker := NewMockBroker(t, 1)
  325. secondBroker := NewMockBroker(t, 2)
  326. defer seedBroker.Close()
  327. defer secondBroker.Close()
  328. seedBroker.SetHandlerByMap(map[string]MockResponse{
  329. "MetadataRequest": NewMockMetadataResponse(t).
  330. SetController(seedBroker.BrokerID()).
  331. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  332. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  333. SetLeader(topicName, 1, 1).
  334. SetLeader(topicName, 2, 1).
  335. SetLeader(topicName, 3, 2),
  336. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  337. })
  338. secondBroker.SetHandlerByMap(map[string]MockResponse{
  339. "MetadataRequest": NewMockMetadataResponse(t).
  340. SetController(seedBroker.BrokerID()).
  341. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  342. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  343. SetLeader(topicName, 1, 1).
  344. SetLeader(topicName, 2, 1).
  345. SetLeader(topicName, 3, 2),
  346. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  347. })
  348. config := NewConfig()
  349. config.Version = V1_0_0_0
  350. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  351. if err != nil {
  352. t.Fatal(err)
  353. }
  354. partitionOffset := make(map[int32]int64)
  355. partitionOffset[1] = 1000
  356. partitionOffset[2] = 1000
  357. partitionOffset[3] = 1000
  358. err = admin.DeleteRecords(topicName, partitionOffset)
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. err = admin.Close()
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. }
  367. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  368. topicName := "my_topic"
  369. seedBroker := NewMockBroker(t, 1)
  370. defer seedBroker.Close()
  371. seedBroker.SetHandlerByMap(map[string]MockResponse{
  372. "MetadataRequest": NewMockMetadataResponse(t).
  373. SetController(seedBroker.BrokerID()).
  374. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  375. SetLeader(topicName, 1, 1).
  376. SetLeader(topicName, 2, 1).
  377. SetLeader(topicName, 3, 1),
  378. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  379. })
  380. config := NewConfig()
  381. config.Version = V0_10_2_0
  382. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. partitionOffset := make(map[int32]int64)
  387. partitionOffset[1] = 1000
  388. partitionOffset[2] = 1000
  389. partitionOffset[3] = 1000
  390. err = admin.DeleteRecords(topicName, partitionOffset)
  391. if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
  392. t.Fatal(err)
  393. }
  394. deleteRecordsError, ok := err.(ErrDeleteRecords)
  395. if !ok {
  396. t.Fatal(err)
  397. }
  398. for _, err := range *deleteRecordsError.Errors {
  399. if err != ErrUnsupportedVersion {
  400. t.Fatal(err)
  401. }
  402. }
  403. err = admin.Close()
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. }
  408. func TestClusterAdminDescribeConfig(t *testing.T) {
  409. seedBroker := NewMockBroker(t, 1)
  410. defer seedBroker.Close()
  411. seedBroker.SetHandlerByMap(map[string]MockResponse{
  412. "MetadataRequest": NewMockMetadataResponse(t).
  413. SetController(seedBroker.BrokerID()).
  414. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  415. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  416. })
  417. config := NewConfig()
  418. config.Version = V1_0_0_0
  419. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  424. entries, err := admin.DescribeConfig(resource)
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. if len(entries) <= 0 {
  429. t.Fatal(errors.New("no resource present"))
  430. }
  431. err = admin.Close()
  432. if err != nil {
  433. t.Fatal(err)
  434. }
  435. }
  436. // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
  437. // is sent to the broker in the resource struct, _not_ the controller
  438. func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
  439. Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
  440. defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()
  441. controllerBroker := NewMockBroker(t, 1)
  442. defer controllerBroker.Close()
  443. configBroker := NewMockBroker(t, 2)
  444. defer configBroker.Close()
  445. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  446. "MetadataRequest": NewMockMetadataResponse(t).
  447. SetController(controllerBroker.BrokerID()).
  448. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  449. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  450. })
  451. configBroker.SetHandlerByMap(map[string]MockResponse{
  452. "MetadataRequest": NewMockMetadataResponse(t).
  453. SetController(controllerBroker.BrokerID()).
  454. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  455. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  456. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  457. })
  458. config := NewConfig()
  459. config.Version = V1_0_0_0
  460. admin, err := NewClusterAdmin(
  461. []string{
  462. controllerBroker.Addr(),
  463. configBroker.Addr(),
  464. }, config)
  465. if err != nil {
  466. t.Fatal(err)
  467. }
  468. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  469. resource := ConfigResource{Name: "2", Type: resourceType}
  470. entries, err := admin.DescribeConfig(resource)
  471. if err != nil {
  472. t.Fatal(err)
  473. }
  474. if len(entries) <= 0 {
  475. t.Fatal(errors.New("no resource present"))
  476. }
  477. }
  478. err = admin.Close()
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. }
  483. func TestClusterAdminAlterConfig(t *testing.T) {
  484. seedBroker := NewMockBroker(t, 1)
  485. defer seedBroker.Close()
  486. seedBroker.SetHandlerByMap(map[string]MockResponse{
  487. "MetadataRequest": NewMockMetadataResponse(t).
  488. SetController(seedBroker.BrokerID()).
  489. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  490. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  491. })
  492. config := NewConfig()
  493. config.Version = V1_0_0_0
  494. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. var value string
  499. entries := make(map[string]*string)
  500. value = "3"
  501. entries["ReplicationFactor"] = &value
  502. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  503. if err != nil {
  504. t.Fatal(err)
  505. }
  506. err = admin.Close()
  507. if err != nil {
  508. t.Fatal(err)
  509. }
  510. }
  511. func TestClusterAdminAlterBrokerConfig(t *testing.T) {
  512. controllerBroker := NewMockBroker(t, 1)
  513. defer controllerBroker.Close()
  514. configBroker := NewMockBroker(t, 2)
  515. defer configBroker.Close()
  516. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  517. "MetadataRequest": NewMockMetadataResponse(t).
  518. SetController(controllerBroker.BrokerID()).
  519. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  520. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  521. })
  522. configBroker.SetHandlerByMap(map[string]MockResponse{
  523. "MetadataRequest": NewMockMetadataResponse(t).
  524. SetController(controllerBroker.BrokerID()).
  525. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  526. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  527. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  528. })
  529. config := NewConfig()
  530. config.Version = V1_0_0_0
  531. admin, err := NewClusterAdmin(
  532. []string{
  533. controllerBroker.Addr(),
  534. configBroker.Addr(),
  535. }, config)
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. var value string
  540. entries := make(map[string]*string)
  541. value = "3"
  542. entries["min.insync.replicas"] = &value
  543. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  544. resource := ConfigResource{Name: "2", Type: resourceType}
  545. err = admin.AlterConfig(
  546. resource.Type,
  547. resource.Name,
  548. entries,
  549. false)
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. }
  554. err = admin.Close()
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. }
  559. func TestClusterAdminCreateAcl(t *testing.T) {
  560. seedBroker := NewMockBroker(t, 1)
  561. defer seedBroker.Close()
  562. seedBroker.SetHandlerByMap(map[string]MockResponse{
  563. "MetadataRequest": NewMockMetadataResponse(t).
  564. SetController(seedBroker.BrokerID()).
  565. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  566. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  567. })
  568. config := NewConfig()
  569. config.Version = V1_0_0_0
  570. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  571. if err != nil {
  572. t.Fatal(err)
  573. }
  574. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  575. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  576. err = admin.CreateACL(r, a)
  577. if err != nil {
  578. t.Fatal(err)
  579. }
  580. err = admin.Close()
  581. if err != nil {
  582. t.Fatal(err)
  583. }
  584. }
  585. func TestClusterAdminListAcls(t *testing.T) {
  586. seedBroker := NewMockBroker(t, 1)
  587. defer seedBroker.Close()
  588. seedBroker.SetHandlerByMap(map[string]MockResponse{
  589. "MetadataRequest": NewMockMetadataResponse(t).
  590. SetController(seedBroker.BrokerID()).
  591. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  592. "DescribeAclsRequest": NewMockListAclsResponse(t),
  593. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  594. })
  595. config := NewConfig()
  596. config.Version = V1_0_0_0
  597. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  598. if err != nil {
  599. t.Fatal(err)
  600. }
  601. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  602. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  603. err = admin.CreateACL(r, a)
  604. if err != nil {
  605. t.Fatal(err)
  606. }
  607. resourceName := "my_topic"
  608. filter := AclFilter{
  609. ResourceType: AclResourceTopic,
  610. Operation: AclOperationRead,
  611. ResourceName: &resourceName,
  612. }
  613. rAcls, err := admin.ListAcls(filter)
  614. if err != nil {
  615. t.Fatal(err)
  616. }
  617. if len(rAcls) <= 0 {
  618. t.Fatal("no acls present")
  619. }
  620. err = admin.Close()
  621. if err != nil {
  622. t.Fatal(err)
  623. }
  624. }
  625. func TestClusterAdminDeleteAcl(t *testing.T) {
  626. seedBroker := NewMockBroker(t, 1)
  627. defer seedBroker.Close()
  628. seedBroker.SetHandlerByMap(map[string]MockResponse{
  629. "MetadataRequest": NewMockMetadataResponse(t).
  630. SetController(seedBroker.BrokerID()).
  631. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  632. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  633. })
  634. config := NewConfig()
  635. config.Version = V1_0_0_0
  636. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  637. if err != nil {
  638. t.Fatal(err)
  639. }
  640. resourceName := "my_topic"
  641. filter := AclFilter{
  642. ResourceType: AclResourceTopic,
  643. Operation: AclOperationAlter,
  644. ResourceName: &resourceName,
  645. }
  646. _, err = admin.DeleteACL(filter, false)
  647. if err != nil {
  648. t.Fatal(err)
  649. }
  650. err = admin.Close()
  651. if err != nil {
  652. t.Fatal(err)
  653. }
  654. }
  655. func TestDescribeTopic(t *testing.T) {
  656. seedBroker := NewMockBroker(t, 1)
  657. defer seedBroker.Close()
  658. seedBroker.SetHandlerByMap(map[string]MockResponse{
  659. "MetadataRequest": NewMockMetadataResponse(t).
  660. SetController(seedBroker.BrokerID()).
  661. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  662. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  663. })
  664. config := NewConfig()
  665. config.Version = V1_0_0_0
  666. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. topics, err := admin.DescribeTopics([]string{"my_topic"})
  671. if err != nil {
  672. t.Fatal(err)
  673. }
  674. if len(topics) != 1 {
  675. t.Fatalf("Expected 1 result, got %v", len(topics))
  676. }
  677. if topics[0].Name != "my_topic" {
  678. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  679. }
  680. err = admin.Close()
  681. if err != nil {
  682. t.Fatal(err)
  683. }
  684. }
  685. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  686. seedBroker := NewMockBroker(t, 1)
  687. defer seedBroker.Close()
  688. seedBroker.SetHandlerByMap(map[string]MockResponse{
  689. "MetadataRequest": NewMockMetadataResponse(t).
  690. SetController(seedBroker.BrokerID()).
  691. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  692. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  693. })
  694. config := NewConfig()
  695. config.Version = V0_11_0_0
  696. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  697. if err != nil {
  698. t.Fatal(err)
  699. }
  700. topics, err := admin.DescribeTopics([]string{"my_topic"})
  701. if err != nil {
  702. t.Fatal(err)
  703. }
  704. if len(topics) != 1 {
  705. t.Fatalf("Expected 1 result, got %v", len(topics))
  706. }
  707. if topics[0].Name != "my_topic" {
  708. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  709. }
  710. err = admin.Close()
  711. if err != nil {
  712. t.Fatal(err)
  713. }
  714. }
  715. func TestDescribeConsumerGroup(t *testing.T) {
  716. seedBroker := NewMockBroker(t, 1)
  717. defer seedBroker.Close()
  718. expectedGroupID := "my-group"
  719. seedBroker.SetHandlerByMap(map[string]MockResponse{
  720. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  721. GroupId: expectedGroupID,
  722. }),
  723. "MetadataRequest": NewMockMetadataResponse(t).
  724. SetController(seedBroker.BrokerID()).
  725. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  726. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  727. })
  728. config := NewConfig()
  729. config.Version = V1_0_0_0
  730. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  731. if err != nil {
  732. t.Fatal(err)
  733. }
  734. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  735. if err != nil {
  736. t.Fatal(err)
  737. }
  738. if len(result) != 1 {
  739. t.Fatalf("Expected 1 result, got %v", len(result))
  740. }
  741. if result[0].GroupId != expectedGroupID {
  742. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  743. }
  744. err = admin.Close()
  745. if err != nil {
  746. t.Fatal(err)
  747. }
  748. }
  749. func TestListConsumerGroups(t *testing.T) {
  750. seedBroker := NewMockBroker(t, 1)
  751. defer seedBroker.Close()
  752. seedBroker.SetHandlerByMap(map[string]MockResponse{
  753. "MetadataRequest": NewMockMetadataResponse(t).
  754. SetController(seedBroker.BrokerID()).
  755. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  756. "ListGroupsRequest": NewMockListGroupsResponse(t).
  757. AddGroup("my-group", "consumer"),
  758. })
  759. config := NewConfig()
  760. config.Version = V1_0_0_0
  761. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  762. if err != nil {
  763. t.Fatal(err)
  764. }
  765. groups, err := admin.ListConsumerGroups()
  766. if err != nil {
  767. t.Fatal(err)
  768. }
  769. if len(groups) != 1 {
  770. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  771. }
  772. protocolType, ok := groups["my-group"]
  773. if !ok {
  774. t.Fatal("Expected group to be returned, but it did not")
  775. }
  776. if protocolType != "consumer" {
  777. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  778. }
  779. err = admin.Close()
  780. if err != nil {
  781. t.Fatal(err)
  782. }
  783. }
  784. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  785. seedBroker := NewMockBroker(t, 1)
  786. defer seedBroker.Close()
  787. secondBroker := NewMockBroker(t, 2)
  788. defer secondBroker.Close()
  789. firstGroup := "first"
  790. secondGroup := "second"
  791. nonExistingGroup := "non-existing-group"
  792. seedBroker.SetHandlerByMap(map[string]MockResponse{
  793. "MetadataRequest": NewMockMetadataResponse(t).
  794. SetController(seedBroker.BrokerID()).
  795. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  796. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  797. "ListGroupsRequest": NewMockListGroupsResponse(t).
  798. AddGroup(firstGroup, "consumer"),
  799. })
  800. secondBroker.SetHandlerByMap(map[string]MockResponse{
  801. "MetadataRequest": NewMockMetadataResponse(t).
  802. SetController(seedBroker.BrokerID()).
  803. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  804. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  805. "ListGroupsRequest": NewMockListGroupsResponse(t).
  806. AddGroup(secondGroup, "consumer"),
  807. })
  808. config := NewConfig()
  809. config.Version = V1_0_0_0
  810. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  811. if err != nil {
  812. t.Fatal(err)
  813. }
  814. groups, err := admin.ListConsumerGroups()
  815. if err != nil {
  816. t.Fatal(err)
  817. }
  818. if len(groups) != 2 {
  819. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  820. }
  821. if _, found := groups[firstGroup]; !found {
  822. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  823. }
  824. if _, found := groups[secondGroup]; !found {
  825. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  826. }
  827. if _, found := groups[nonExistingGroup]; found {
  828. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  829. }
  830. err = admin.Close()
  831. if err != nil {
  832. t.Fatal(err)
  833. }
  834. }
  835. func TestListConsumerGroupOffsets(t *testing.T) {
  836. seedBroker := NewMockBroker(t, 1)
  837. defer seedBroker.Close()
  838. group := "my-group"
  839. topic := "my-topic"
  840. partition := int32(0)
  841. expectedOffset := int64(0)
  842. seedBroker.SetHandlerByMap(map[string]MockResponse{
  843. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
  844. "MetadataRequest": NewMockMetadataResponse(t).
  845. SetController(seedBroker.BrokerID()).
  846. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  847. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  848. })
  849. config := NewConfig()
  850. config.Version = V1_0_0_0
  851. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  852. if err != nil {
  853. t.Fatal(err)
  854. }
  855. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  856. topic: []int32{0},
  857. })
  858. if err != nil {
  859. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  860. }
  861. block := response.GetBlock(topic, partition)
  862. if block == nil {
  863. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  864. }
  865. if block.Offset != expectedOffset {
  866. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  867. }
  868. err = admin.Close()
  869. if err != nil {
  870. t.Fatal(err)
  871. }
  872. }
  873. func TestDeleteConsumerGroup(t *testing.T) {
  874. seedBroker := NewMockBroker(t, 1)
  875. defer seedBroker.Close()
  876. group := "my-group"
  877. seedBroker.SetHandlerByMap(map[string]MockResponse{
  878. // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  879. "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
  880. "MetadataRequest": NewMockMetadataResponse(t).
  881. SetController(seedBroker.BrokerID()).
  882. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  883. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  884. })
  885. config := NewConfig()
  886. config.Version = V1_1_0_0
  887. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  888. if err != nil {
  889. t.Fatal(err)
  890. }
  891. err = admin.DeleteConsumerGroup(group)
  892. if err != nil {
  893. t.Fatalf("DeleteConsumerGroup failed with error %v", err)
  894. }
  895. }