admin_test.go 24 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. )
  7. func TestClusterAdmin(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. defer seedBroker.Close()
  10. seedBroker.SetHandlerByMap(map[string]MockResponse{
  11. "MetadataRequest": NewMockMetadataResponse(t).
  12. SetController(seedBroker.BrokerID()).
  13. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  14. })
  15. config := NewConfig()
  16. config.Version = V1_0_0_0
  17. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. err = admin.Close()
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. }
  26. func TestClusterAdminInvalidController(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. defer seedBroker.Close()
  29. seedBroker.SetHandlerByMap(map[string]MockResponse{
  30. "MetadataRequest": NewMockMetadataResponse(t).
  31. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  32. })
  33. config := NewConfig()
  34. config.Version = V1_0_0_0
  35. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  36. if err == nil {
  37. t.Fatal(errors.New("controller not set still cluster admin was created"))
  38. }
  39. if err != ErrControllerNotAvailable {
  40. t.Fatal(err)
  41. }
  42. }
  43. func TestClusterAdminCreateTopic(t *testing.T) {
  44. seedBroker := NewMockBroker(t, 1)
  45. defer seedBroker.Close()
  46. seedBroker.SetHandlerByMap(map[string]MockResponse{
  47. "MetadataRequest": NewMockMetadataResponse(t).
  48. SetController(seedBroker.BrokerID()).
  49. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  50. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  51. })
  52. config := NewConfig()
  53. config.Version = V0_10_2_0
  54. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.Close()
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  68. seedBroker := NewMockBroker(t, 1)
  69. defer seedBroker.Close()
  70. seedBroker.SetHandlerByMap(map[string]MockResponse{
  71. "MetadataRequest": NewMockMetadataResponse(t).
  72. SetController(seedBroker.BrokerID()).
  73. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  74. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  75. })
  76. config := NewConfig()
  77. config.Version = V0_10_2_0
  78. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. err = admin.CreateTopic("my_topic", nil, false)
  83. if err.Error() != "you must specify topic details" {
  84. t.Fatal(err)
  85. }
  86. err = admin.Close()
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. }
  91. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  92. seedBroker := NewMockBroker(t, 1)
  93. defer seedBroker.Close()
  94. seedBroker.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetController(seedBroker.BrokerID()).
  97. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  98. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  99. })
  100. config := NewConfig()
  101. config.Version = V0_11_0_0
  102. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  107. want := "insufficient permissions to create topic with reserved prefix"
  108. if !strings.HasSuffix(err.Error(), want) {
  109. t.Fatal(err)
  110. }
  111. err = admin.Close()
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. func TestClusterAdminListTopics(t *testing.T) {
  117. seedBroker := NewMockBroker(t, 1)
  118. defer seedBroker.Close()
  119. seedBroker.SetHandlerByMap(map[string]MockResponse{
  120. "MetadataRequest": NewMockMetadataResponse(t).
  121. SetController(seedBroker.BrokerID()).
  122. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  123. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  124. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  125. })
  126. config := NewConfig()
  127. config.Version = V1_0_0_0
  128. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := admin.ListTopics()
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) <= 0 {
  137. t.Fatal(errors.New("no resource present"))
  138. }
  139. topic, found := entries["my_topic"]
  140. if !found {
  141. t.Fatal(errors.New("topic not found in response"))
  142. }
  143. _, found = topic.ConfigEntries["max.message.bytes"]
  144. if found {
  145. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  146. }
  147. value, _ := topic.ConfigEntries["retention.ms"]
  148. if value == nil || *value != "5000" {
  149. t.Fatal(errors.New("non-default topic config entry not found in response"))
  150. }
  151. err = admin.Close()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  156. t.Fatal(errors.New("replica assignment not found in response"))
  157. }
  158. }
  159. func TestClusterAdminDeleteTopic(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. defer seedBroker.Close()
  162. seedBroker.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetController(seedBroker.BrokerID()).
  165. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  166. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  167. })
  168. config := NewConfig()
  169. config.Version = V0_10_2_0
  170. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. err = admin.DeleteTopic("my_topic")
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.Close()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. }
  183. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  184. seedBroker := NewMockBroker(t, 1)
  185. defer seedBroker.Close()
  186. seedBroker.SetHandlerByMap(map[string]MockResponse{
  187. "MetadataRequest": NewMockMetadataResponse(t).
  188. SetController(seedBroker.BrokerID()).
  189. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  190. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  191. })
  192. config := NewConfig()
  193. config.Version = V0_10_2_0
  194. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. err = admin.DeleteTopic("")
  199. if err != ErrInvalidTopic {
  200. t.Fatal(err)
  201. }
  202. err = admin.Close()
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. }
  207. func TestClusterAdminCreatePartitions(t *testing.T) {
  208. seedBroker := NewMockBroker(t, 1)
  209. defer seedBroker.Close()
  210. seedBroker.SetHandlerByMap(map[string]MockResponse{
  211. "MetadataRequest": NewMockMetadataResponse(t).
  212. SetController(seedBroker.BrokerID()).
  213. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  214. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  215. })
  216. config := NewConfig()
  217. config.Version = V1_0_0_0
  218. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. err = admin.CreatePartitions("my_topic", 3, nil, false)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.Close()
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. }
  231. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  232. seedBroker := NewMockBroker(t, 1)
  233. defer seedBroker.Close()
  234. seedBroker.SetHandlerByMap(map[string]MockResponse{
  235. "MetadataRequest": NewMockMetadataResponse(t).
  236. SetController(seedBroker.BrokerID()).
  237. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  238. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  239. })
  240. config := NewConfig()
  241. config.Version = V0_10_2_0
  242. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. err = admin.CreatePartitions("my_topic", 3, nil, false)
  247. if err != ErrUnsupportedVersion {
  248. t.Fatal(err)
  249. }
  250. err = admin.Close()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  256. seedBroker := NewMockBroker(t, 1)
  257. defer seedBroker.Close()
  258. seedBroker.SetHandlerByMap(map[string]MockResponse{
  259. "MetadataRequest": NewMockMetadataResponse(t).
  260. SetController(seedBroker.BrokerID()).
  261. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  262. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  263. })
  264. config := NewConfig()
  265. config.Version = V1_0_0_0
  266. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  271. want := "insufficient permissions to create partition on topic with reserved prefix"
  272. if !strings.HasSuffix(err.Error(), want) {
  273. t.Fatal(err)
  274. }
  275. err = admin.Close()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. }
  280. func TestClusterAdminDeleteRecords(t *testing.T) {
  281. topicName := "my_topic"
  282. seedBroker := NewMockBroker(t, 1)
  283. defer seedBroker.Close()
  284. seedBroker.SetHandlerByMap(map[string]MockResponse{
  285. "MetadataRequest": NewMockMetadataResponse(t).
  286. SetController(seedBroker.BrokerID()).
  287. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  288. SetLeader(topicName, 1, 1).
  289. SetLeader(topicName, 2, 1).
  290. SetLeader(topicName, 3, 1),
  291. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  292. })
  293. config := NewConfig()
  294. config.Version = V1_0_0_0
  295. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. partitionOffsetFake := make(map[int32]int64)
  300. partitionOffsetFake[4] = 1000
  301. errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
  302. if errFake == nil {
  303. t.Fatal(err)
  304. }
  305. partitionOffset := make(map[int32]int64)
  306. partitionOffset[1] = 1000
  307. partitionOffset[2] = 1000
  308. partitionOffset[3] = 1000
  309. err = admin.DeleteRecords(topicName, partitionOffset)
  310. if err != nil {
  311. t.Fatal(err)
  312. }
  313. err = admin.Close()
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. }
  318. func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
  319. topicName := "my_topic"
  320. seedBroker := NewMockBroker(t, 1)
  321. secondBroker := NewMockBroker(t, 2)
  322. defer seedBroker.Close()
  323. defer secondBroker.Close()
  324. seedBroker.SetHandlerByMap(map[string]MockResponse{
  325. "MetadataRequest": NewMockMetadataResponse(t).
  326. SetController(seedBroker.BrokerID()).
  327. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  328. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  329. SetLeader(topicName, 1, 1).
  330. SetLeader(topicName, 2, 1).
  331. SetLeader(topicName, 3, 2),
  332. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  333. })
  334. secondBroker.SetHandlerByMap(map[string]MockResponse{
  335. "MetadataRequest": NewMockMetadataResponse(t).
  336. SetController(seedBroker.BrokerID()).
  337. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  338. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  339. SetLeader(topicName, 1, 1).
  340. SetLeader(topicName, 2, 1).
  341. SetLeader(topicName, 3, 2),
  342. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  343. })
  344. config := NewConfig()
  345. config.Version = V1_0_0_0
  346. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  347. if err != nil {
  348. t.Fatal(err)
  349. }
  350. partitionOffset := make(map[int32]int64)
  351. partitionOffset[1] = 1000
  352. partitionOffset[2] = 1000
  353. partitionOffset[3] = 1000
  354. err = admin.DeleteRecords(topicName, partitionOffset)
  355. if err != nil {
  356. t.Fatal(err)
  357. }
  358. err = admin.Close()
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. }
  363. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  364. topicName := "my_topic"
  365. seedBroker := NewMockBroker(t, 1)
  366. defer seedBroker.Close()
  367. seedBroker.SetHandlerByMap(map[string]MockResponse{
  368. "MetadataRequest": NewMockMetadataResponse(t).
  369. SetController(seedBroker.BrokerID()).
  370. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  371. SetLeader(topicName, 1, 1).
  372. SetLeader(topicName, 2, 1).
  373. SetLeader(topicName, 3, 1),
  374. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  375. })
  376. config := NewConfig()
  377. config.Version = V0_10_2_0
  378. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  379. if err != nil {
  380. t.Fatal(err)
  381. }
  382. partitionOffset := make(map[int32]int64)
  383. partitionOffset[1] = 1000
  384. partitionOffset[2] = 1000
  385. partitionOffset[3] = 1000
  386. err = admin.DeleteRecords(topicName, partitionOffset)
  387. if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
  388. t.Fatal(err)
  389. }
  390. deleteRecordsError, ok := err.(ErrDeleteRecords)
  391. if !ok {
  392. t.Fatal(err)
  393. }
  394. for _, err := range *deleteRecordsError.Errors {
  395. if err != ErrUnsupportedVersion {
  396. t.Fatal(err)
  397. }
  398. }
  399. err = admin.Close()
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. }
  404. func TestClusterAdminDescribeConfig(t *testing.T) {
  405. seedBroker := NewMockBroker(t, 1)
  406. defer seedBroker.Close()
  407. seedBroker.SetHandlerByMap(map[string]MockResponse{
  408. "MetadataRequest": NewMockMetadataResponse(t).
  409. SetController(seedBroker.BrokerID()).
  410. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  411. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  412. })
  413. config := NewConfig()
  414. config.Version = V1_0_0_0
  415. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  416. if err != nil {
  417. t.Fatal(err)
  418. }
  419. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  420. entries, err := admin.DescribeConfig(resource)
  421. if err != nil {
  422. t.Fatal(err)
  423. }
  424. if len(entries) <= 0 {
  425. t.Fatal(errors.New("no resource present"))
  426. }
  427. err = admin.Close()
  428. if err != nil {
  429. t.Fatal(err)
  430. }
  431. }
  432. func TestClusterAdminAlterConfig(t *testing.T) {
  433. seedBroker := NewMockBroker(t, 1)
  434. defer seedBroker.Close()
  435. seedBroker.SetHandlerByMap(map[string]MockResponse{
  436. "MetadataRequest": NewMockMetadataResponse(t).
  437. SetController(seedBroker.BrokerID()).
  438. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  439. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  440. })
  441. config := NewConfig()
  442. config.Version = V1_0_0_0
  443. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  444. if err != nil {
  445. t.Fatal(err)
  446. }
  447. var value string
  448. entries := make(map[string]*string)
  449. value = "3"
  450. entries["ReplicationFactor"] = &value
  451. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  452. if err != nil {
  453. t.Fatal(err)
  454. }
  455. err = admin.Close()
  456. if err != nil {
  457. t.Fatal(err)
  458. }
  459. }
  460. func TestClusterAdminCreateAcl(t *testing.T) {
  461. seedBroker := NewMockBroker(t, 1)
  462. defer seedBroker.Close()
  463. seedBroker.SetHandlerByMap(map[string]MockResponse{
  464. "MetadataRequest": NewMockMetadataResponse(t).
  465. SetController(seedBroker.BrokerID()).
  466. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  467. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  468. })
  469. config := NewConfig()
  470. config.Version = V1_0_0_0
  471. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  472. if err != nil {
  473. t.Fatal(err)
  474. }
  475. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  476. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  477. err = admin.CreateACL(r, a)
  478. if err != nil {
  479. t.Fatal(err)
  480. }
  481. err = admin.Close()
  482. if err != nil {
  483. t.Fatal(err)
  484. }
  485. }
  486. func TestClusterAdminListAcls(t *testing.T) {
  487. seedBroker := NewMockBroker(t, 1)
  488. defer seedBroker.Close()
  489. seedBroker.SetHandlerByMap(map[string]MockResponse{
  490. "MetadataRequest": NewMockMetadataResponse(t).
  491. SetController(seedBroker.BrokerID()).
  492. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  493. "DescribeAclsRequest": NewMockListAclsResponse(t),
  494. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  495. })
  496. config := NewConfig()
  497. config.Version = V1_0_0_0
  498. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  499. if err != nil {
  500. t.Fatal(err)
  501. }
  502. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  503. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  504. err = admin.CreateACL(r, a)
  505. if err != nil {
  506. t.Fatal(err)
  507. }
  508. resourceName := "my_topic"
  509. filter := AclFilter{
  510. ResourceType: AclResourceTopic,
  511. Operation: AclOperationRead,
  512. ResourceName: &resourceName,
  513. }
  514. rAcls, err := admin.ListAcls(filter)
  515. if err != nil {
  516. t.Fatal(err)
  517. }
  518. if len(rAcls) <= 0 {
  519. t.Fatal("no acls present")
  520. }
  521. err = admin.Close()
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. }
  526. func TestClusterAdminDeleteAcl(t *testing.T) {
  527. seedBroker := NewMockBroker(t, 1)
  528. defer seedBroker.Close()
  529. seedBroker.SetHandlerByMap(map[string]MockResponse{
  530. "MetadataRequest": NewMockMetadataResponse(t).
  531. SetController(seedBroker.BrokerID()).
  532. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  533. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  534. })
  535. config := NewConfig()
  536. config.Version = V1_0_0_0
  537. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. resourceName := "my_topic"
  542. filter := AclFilter{
  543. ResourceType: AclResourceTopic,
  544. Operation: AclOperationAlter,
  545. ResourceName: &resourceName,
  546. }
  547. _, err = admin.DeleteACL(filter, false)
  548. if err != nil {
  549. t.Fatal(err)
  550. }
  551. err = admin.Close()
  552. if err != nil {
  553. t.Fatal(err)
  554. }
  555. }
  556. func TestDescribeTopic(t *testing.T) {
  557. seedBroker := NewMockBroker(t, 1)
  558. defer seedBroker.Close()
  559. seedBroker.SetHandlerByMap(map[string]MockResponse{
  560. "MetadataRequest": NewMockMetadataResponse(t).
  561. SetController(seedBroker.BrokerID()).
  562. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  563. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  564. })
  565. config := NewConfig()
  566. config.Version = V1_0_0_0
  567. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  568. if err != nil {
  569. t.Fatal(err)
  570. }
  571. topics, err := admin.DescribeTopics([]string{"my_topic"})
  572. if err != nil {
  573. t.Fatal(err)
  574. }
  575. if len(topics) != 1 {
  576. t.Fatalf("Expected 1 result, got %v", len(topics))
  577. }
  578. if topics[0].Name != "my_topic" {
  579. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  580. }
  581. err = admin.Close()
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. }
  586. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  587. seedBroker := NewMockBroker(t, 1)
  588. defer seedBroker.Close()
  589. seedBroker.SetHandlerByMap(map[string]MockResponse{
  590. "MetadataRequest": NewMockMetadataResponse(t).
  591. SetController(seedBroker.BrokerID()).
  592. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  593. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  594. })
  595. config := NewConfig()
  596. config.Version = V0_11_0_0
  597. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  598. if err != nil {
  599. t.Fatal(err)
  600. }
  601. topics, err := admin.DescribeTopics([]string{"my_topic"})
  602. if err != nil {
  603. t.Fatal(err)
  604. }
  605. if len(topics) != 1 {
  606. t.Fatalf("Expected 1 result, got %v", len(topics))
  607. }
  608. if topics[0].Name != "my_topic" {
  609. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  610. }
  611. err = admin.Close()
  612. if err != nil {
  613. t.Fatal(err)
  614. }
  615. }
  616. func TestDescribeConsumerGroup(t *testing.T) {
  617. seedBroker := NewMockBroker(t, 1)
  618. defer seedBroker.Close()
  619. expectedGroupID := "my-group"
  620. seedBroker.SetHandlerByMap(map[string]MockResponse{
  621. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  622. GroupId: expectedGroupID,
  623. }),
  624. "MetadataRequest": NewMockMetadataResponse(t).
  625. SetController(seedBroker.BrokerID()).
  626. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  627. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  628. })
  629. config := NewConfig()
  630. config.Version = V1_0_0_0
  631. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  632. if err != nil {
  633. t.Fatal(err)
  634. }
  635. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  636. if err != nil {
  637. t.Fatal(err)
  638. }
  639. if len(result) != 1 {
  640. t.Fatalf("Expected 1 result, got %v", len(result))
  641. }
  642. if result[0].GroupId != expectedGroupID {
  643. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  644. }
  645. err = admin.Close()
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. }
  650. func TestListConsumerGroups(t *testing.T) {
  651. seedBroker := NewMockBroker(t, 1)
  652. defer seedBroker.Close()
  653. seedBroker.SetHandlerByMap(map[string]MockResponse{
  654. "MetadataRequest": NewMockMetadataResponse(t).
  655. SetController(seedBroker.BrokerID()).
  656. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  657. "ListGroupsRequest": NewMockListGroupsResponse(t).
  658. AddGroup("my-group", "consumer"),
  659. })
  660. config := NewConfig()
  661. config.Version = V1_0_0_0
  662. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  663. if err != nil {
  664. t.Fatal(err)
  665. }
  666. groups, err := admin.ListConsumerGroups()
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. if len(groups) != 1 {
  671. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  672. }
  673. protocolType, ok := groups["my-group"]
  674. if !ok {
  675. t.Fatal("Expected group to be returned, but it did not")
  676. }
  677. if protocolType != "consumer" {
  678. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  679. }
  680. err = admin.Close()
  681. if err != nil {
  682. t.Fatal(err)
  683. }
  684. }
  685. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  686. seedBroker := NewMockBroker(t, 1)
  687. defer seedBroker.Close()
  688. secondBroker := NewMockBroker(t, 2)
  689. defer secondBroker.Close()
  690. firstGroup := "first"
  691. secondGroup := "second"
  692. nonExistingGroup := "non-existing-group"
  693. seedBroker.SetHandlerByMap(map[string]MockResponse{
  694. "MetadataRequest": NewMockMetadataResponse(t).
  695. SetController(seedBroker.BrokerID()).
  696. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  697. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  698. "ListGroupsRequest": NewMockListGroupsResponse(t).
  699. AddGroup(firstGroup, "consumer"),
  700. })
  701. secondBroker.SetHandlerByMap(map[string]MockResponse{
  702. "MetadataRequest": NewMockMetadataResponse(t).
  703. SetController(seedBroker.BrokerID()).
  704. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  705. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  706. "ListGroupsRequest": NewMockListGroupsResponse(t).
  707. AddGroup(secondGroup, "consumer"),
  708. })
  709. config := NewConfig()
  710. config.Version = V1_0_0_0
  711. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  712. if err != nil {
  713. t.Fatal(err)
  714. }
  715. groups, err := admin.ListConsumerGroups()
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. if len(groups) != 2 {
  720. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  721. }
  722. if _, found := groups[firstGroup]; !found {
  723. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  724. }
  725. if _, found := groups[secondGroup]; !found {
  726. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  727. }
  728. if _, found := groups[nonExistingGroup]; found {
  729. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  730. }
  731. err = admin.Close()
  732. if err != nil {
  733. t.Fatal(err)
  734. }
  735. }
  736. func TestListConsumerGroupOffsets(t *testing.T) {
  737. seedBroker := NewMockBroker(t, 1)
  738. defer seedBroker.Close()
  739. group := "my-group"
  740. topic := "my-topic"
  741. partition := int32(0)
  742. expectedOffset := int64(0)
  743. seedBroker.SetHandlerByMap(map[string]MockResponse{
  744. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
  745. "MetadataRequest": NewMockMetadataResponse(t).
  746. SetController(seedBroker.BrokerID()).
  747. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  748. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  749. })
  750. config := NewConfig()
  751. config.Version = V1_0_0_0
  752. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  753. if err != nil {
  754. t.Fatal(err)
  755. }
  756. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  757. topic: []int32{0},
  758. })
  759. if err != nil {
  760. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  761. }
  762. block := response.GetBlock(topic, partition)
  763. if block == nil {
  764. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  765. }
  766. if block.Offset != expectedOffset {
  767. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  768. }
  769. err = admin.Close()
  770. if err != nil {
  771. t.Fatal(err)
  772. }
  773. }
  774. func TestDeleteConsumerGroup(t *testing.T) {
  775. seedBroker := NewMockBroker(t, 1)
  776. defer seedBroker.Close()
  777. group := "my-group"
  778. seedBroker.SetHandlerByMap(map[string]MockResponse{
  779. // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  780. "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
  781. "MetadataRequest": NewMockMetadataResponse(t).
  782. SetController(seedBroker.BrokerID()).
  783. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  784. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  785. })
  786. config := NewConfig()
  787. config.Version = V1_1_0_0
  788. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  789. if err != nil {
  790. t.Fatal(err)
  791. }
  792. err = admin.DeleteConsumerGroup(group)
  793. if err != nil {
  794. t.Fatalf("DeleteConsumerGroup failed with error %v", err)
  795. }
  796. }