admin_test.go 21 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. )
  7. func TestClusterAdmin(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. defer seedBroker.Close()
  10. seedBroker.SetHandlerByMap(map[string]MockResponse{
  11. "MetadataRequest": NewMockMetadataResponse(t).
  12. SetController(seedBroker.BrokerID()).
  13. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  14. })
  15. config := NewConfig()
  16. config.Version = V1_0_0_0
  17. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. err = admin.Close()
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. }
  26. func TestClusterAdminInvalidController(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. defer seedBroker.Close()
  29. seedBroker.SetHandlerByMap(map[string]MockResponse{
  30. "MetadataRequest": NewMockMetadataResponse(t).
  31. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  32. })
  33. config := NewConfig()
  34. config.Version = V1_0_0_0
  35. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  36. if err == nil {
  37. t.Fatal(errors.New("controller not set still cluster admin was created"))
  38. }
  39. if err != ErrControllerNotAvailable {
  40. t.Fatal(err)
  41. }
  42. }
  43. func TestClusterAdminCreateTopic(t *testing.T) {
  44. seedBroker := NewMockBroker(t, 1)
  45. defer seedBroker.Close()
  46. seedBroker.SetHandlerByMap(map[string]MockResponse{
  47. "MetadataRequest": NewMockMetadataResponse(t).
  48. SetController(seedBroker.BrokerID()).
  49. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  50. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  51. })
  52. config := NewConfig()
  53. config.Version = V0_10_2_0
  54. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.Close()
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  68. seedBroker := NewMockBroker(t, 1)
  69. defer seedBroker.Close()
  70. seedBroker.SetHandlerByMap(map[string]MockResponse{
  71. "MetadataRequest": NewMockMetadataResponse(t).
  72. SetController(seedBroker.BrokerID()).
  73. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  74. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  75. })
  76. config := NewConfig()
  77. config.Version = V0_10_2_0
  78. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. err = admin.CreateTopic("my_topic", nil, false)
  83. if err.Error() != "You must specify topic details" {
  84. t.Fatal(err)
  85. }
  86. err = admin.Close()
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. }
  91. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  92. seedBroker := NewMockBroker(t, 1)
  93. defer seedBroker.Close()
  94. seedBroker.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetController(seedBroker.BrokerID()).
  97. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  98. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  99. })
  100. config := NewConfig()
  101. config.Version = V0_11_0_0
  102. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  107. want := "insufficient permissions to create topic with reserved prefix"
  108. if !strings.HasSuffix(err.Error(), want) {
  109. t.Fatal(err)
  110. }
  111. err = admin.Close()
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. func TestClusterAdminListTopics(t *testing.T) {
  117. seedBroker := NewMockBroker(t, 1)
  118. defer seedBroker.Close()
  119. seedBroker.SetHandlerByMap(map[string]MockResponse{
  120. "MetadataRequest": NewMockMetadataResponse(t).
  121. SetController(seedBroker.BrokerID()).
  122. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  123. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  124. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  125. })
  126. config := NewConfig()
  127. config.Version = V1_0_0_0
  128. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := admin.ListTopics()
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) <= 0 {
  137. t.Fatal(errors.New("no resource present"))
  138. }
  139. topic, found := entries["my_topic"]
  140. if !found {
  141. t.Fatal(errors.New("topic not found in response"))
  142. }
  143. _, found = topic.ConfigEntries["max.message.bytes"]
  144. if found {
  145. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  146. }
  147. value, _ := topic.ConfigEntries["retention.ms"]
  148. if value == nil || *value != "5000" {
  149. t.Fatal(errors.New("non-default topic config entry not found in response"))
  150. }
  151. err = admin.Close()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  156. t.Fatal(errors.New("replica assignment not found in response"))
  157. }
  158. }
  159. func TestClusterAdminDeleteTopic(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. defer seedBroker.Close()
  162. seedBroker.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetController(seedBroker.BrokerID()).
  165. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  166. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  167. })
  168. config := NewConfig()
  169. config.Version = V0_10_2_0
  170. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. err = admin.DeleteTopic("my_topic")
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.Close()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. }
  183. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  184. seedBroker := NewMockBroker(t, 1)
  185. defer seedBroker.Close()
  186. seedBroker.SetHandlerByMap(map[string]MockResponse{
  187. "MetadataRequest": NewMockMetadataResponse(t).
  188. SetController(seedBroker.BrokerID()).
  189. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  190. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  191. })
  192. config := NewConfig()
  193. config.Version = V0_10_2_0
  194. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. err = admin.DeleteTopic("")
  199. if err != ErrInvalidTopic {
  200. t.Fatal(err)
  201. }
  202. err = admin.Close()
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. }
  207. func TestClusterAdminCreatePartitions(t *testing.T) {
  208. seedBroker := NewMockBroker(t, 1)
  209. defer seedBroker.Close()
  210. seedBroker.SetHandlerByMap(map[string]MockResponse{
  211. "MetadataRequest": NewMockMetadataResponse(t).
  212. SetController(seedBroker.BrokerID()).
  213. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  214. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  215. })
  216. config := NewConfig()
  217. config.Version = V1_0_0_0
  218. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. err = admin.CreatePartitions("my_topic", 3, nil, false)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.Close()
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. }
  231. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  232. seedBroker := NewMockBroker(t, 1)
  233. defer seedBroker.Close()
  234. seedBroker.SetHandlerByMap(map[string]MockResponse{
  235. "MetadataRequest": NewMockMetadataResponse(t).
  236. SetController(seedBroker.BrokerID()).
  237. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  238. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  239. })
  240. config := NewConfig()
  241. config.Version = V0_10_2_0
  242. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. err = admin.CreatePartitions("my_topic", 3, nil, false)
  247. if err != ErrUnsupportedVersion {
  248. t.Fatal(err)
  249. }
  250. err = admin.Close()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  256. seedBroker := NewMockBroker(t, 1)
  257. defer seedBroker.Close()
  258. seedBroker.SetHandlerByMap(map[string]MockResponse{
  259. "MetadataRequest": NewMockMetadataResponse(t).
  260. SetController(seedBroker.BrokerID()).
  261. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  262. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  263. })
  264. config := NewConfig()
  265. config.Version = V1_0_0_0
  266. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  271. want := "insufficient permissions to create partition on topic with reserved prefix"
  272. if !strings.HasSuffix(err.Error(), want) {
  273. t.Fatal(err)
  274. }
  275. err = admin.Close()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. }
  280. func TestClusterAdminDeleteRecords(t *testing.T) {
  281. seedBroker := NewMockBroker(t, 1)
  282. defer seedBroker.Close()
  283. seedBroker.SetHandlerByMap(map[string]MockResponse{
  284. "MetadataRequest": NewMockMetadataResponse(t).
  285. SetController(seedBroker.BrokerID()).
  286. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  287. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  288. })
  289. config := NewConfig()
  290. config.Version = V1_0_0_0
  291. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  292. if err != nil {
  293. t.Fatal(err)
  294. }
  295. partitionOffset := make(map[int32]int64)
  296. partitionOffset[1] = 1000
  297. partitionOffset[2] = 1000
  298. partitionOffset[3] = 1000
  299. err = admin.DeleteRecords("my_topic", partitionOffset)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. err = admin.Close()
  304. if err != nil {
  305. t.Fatal(err)
  306. }
  307. }
  308. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  309. seedBroker := NewMockBroker(t, 1)
  310. defer seedBroker.Close()
  311. seedBroker.SetHandlerByMap(map[string]MockResponse{
  312. "MetadataRequest": NewMockMetadataResponse(t).
  313. SetController(seedBroker.BrokerID()).
  314. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  315. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  316. })
  317. config := NewConfig()
  318. config.Version = V0_10_2_0
  319. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. partitionOffset := make(map[int32]int64)
  324. partitionOffset[1] = 1000
  325. partitionOffset[2] = 1000
  326. partitionOffset[3] = 1000
  327. err = admin.DeleteRecords("my_topic", partitionOffset)
  328. if err != ErrUnsupportedVersion {
  329. t.Fatal(err)
  330. }
  331. err = admin.Close()
  332. if err != nil {
  333. t.Fatal(err)
  334. }
  335. }
  336. func TestClusterAdminDescribeConfig(t *testing.T) {
  337. seedBroker := NewMockBroker(t, 1)
  338. defer seedBroker.Close()
  339. seedBroker.SetHandlerByMap(map[string]MockResponse{
  340. "MetadataRequest": NewMockMetadataResponse(t).
  341. SetController(seedBroker.BrokerID()).
  342. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  343. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  344. })
  345. config := NewConfig()
  346. config.Version = V1_0_0_0
  347. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  352. entries, err := admin.DescribeConfig(resource)
  353. if err != nil {
  354. t.Fatal(err)
  355. }
  356. if len(entries) <= 0 {
  357. t.Fatal(errors.New("no resource present"))
  358. }
  359. err = admin.Close()
  360. if err != nil {
  361. t.Fatal(err)
  362. }
  363. }
  364. func TestClusterAdminAlterConfig(t *testing.T) {
  365. seedBroker := NewMockBroker(t, 1)
  366. defer seedBroker.Close()
  367. seedBroker.SetHandlerByMap(map[string]MockResponse{
  368. "MetadataRequest": NewMockMetadataResponse(t).
  369. SetController(seedBroker.BrokerID()).
  370. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  371. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  372. })
  373. config := NewConfig()
  374. config.Version = V1_0_0_0
  375. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  376. if err != nil {
  377. t.Fatal(err)
  378. }
  379. var value string
  380. entries := make(map[string]*string)
  381. value = "3"
  382. entries["ReplicationFactor"] = &value
  383. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. err = admin.Close()
  388. if err != nil {
  389. t.Fatal(err)
  390. }
  391. }
  392. func TestClusterAdminCreateAcl(t *testing.T) {
  393. seedBroker := NewMockBroker(t, 1)
  394. defer seedBroker.Close()
  395. seedBroker.SetHandlerByMap(map[string]MockResponse{
  396. "MetadataRequest": NewMockMetadataResponse(t).
  397. SetController(seedBroker.BrokerID()).
  398. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  399. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  400. })
  401. config := NewConfig()
  402. config.Version = V1_0_0_0
  403. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  408. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  409. err = admin.CreateACL(r, a)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. err = admin.Close()
  414. if err != nil {
  415. t.Fatal(err)
  416. }
  417. }
  418. func TestClusterAdminListAcls(t *testing.T) {
  419. seedBroker := NewMockBroker(t, 1)
  420. defer seedBroker.Close()
  421. seedBroker.SetHandlerByMap(map[string]MockResponse{
  422. "MetadataRequest": NewMockMetadataResponse(t).
  423. SetController(seedBroker.BrokerID()).
  424. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  425. "DescribeAclsRequest": NewMockListAclsResponse(t),
  426. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  427. })
  428. config := NewConfig()
  429. config.Version = V1_0_0_0
  430. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  431. if err != nil {
  432. t.Fatal(err)
  433. }
  434. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  435. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  436. err = admin.CreateACL(r, a)
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. resourceName := "my_topic"
  441. filter := AclFilter{
  442. ResourceType: AclResourceTopic,
  443. Operation: AclOperationRead,
  444. ResourceName: &resourceName,
  445. }
  446. rAcls, err := admin.ListAcls(filter)
  447. if err != nil {
  448. t.Fatal(err)
  449. }
  450. if len(rAcls) <= 0 {
  451. t.Fatal("no acls present")
  452. }
  453. err = admin.Close()
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. }
  458. func TestClusterAdminDeleteAcl(t *testing.T) {
  459. seedBroker := NewMockBroker(t, 1)
  460. defer seedBroker.Close()
  461. seedBroker.SetHandlerByMap(map[string]MockResponse{
  462. "MetadataRequest": NewMockMetadataResponse(t).
  463. SetController(seedBroker.BrokerID()).
  464. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  465. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  466. })
  467. config := NewConfig()
  468. config.Version = V1_0_0_0
  469. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. resourceName := "my_topic"
  474. filter := AclFilter{
  475. ResourceType: AclResourceTopic,
  476. Operation: AclOperationAlter,
  477. ResourceName: &resourceName,
  478. }
  479. _, err = admin.DeleteACL(filter, false)
  480. if err != nil {
  481. t.Fatal(err)
  482. }
  483. err = admin.Close()
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. }
  488. func TestDescribeTopic(t *testing.T) {
  489. seedBroker := NewMockBroker(t, 1)
  490. defer seedBroker.Close()
  491. seedBroker.SetHandlerByMap(map[string]MockResponse{
  492. "MetadataRequest": NewMockMetadataResponse(t).
  493. SetController(seedBroker.BrokerID()).
  494. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  495. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  496. })
  497. config := NewConfig()
  498. config.Version = V1_0_0_0
  499. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  500. if err != nil {
  501. t.Fatal(err)
  502. }
  503. topics, err := admin.DescribeTopics([]string{"my_topic"})
  504. if err != nil {
  505. t.Fatal(err)
  506. }
  507. if len(topics) != 1 {
  508. t.Fatalf("Expected 1 result, got %v", len(topics))
  509. }
  510. if topics[0].Name != "my_topic" {
  511. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  512. }
  513. err = admin.Close()
  514. if err != nil {
  515. t.Fatal(err)
  516. }
  517. }
  518. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  519. seedBroker := NewMockBroker(t, 1)
  520. defer seedBroker.Close()
  521. seedBroker.SetHandlerByMap(map[string]MockResponse{
  522. "MetadataRequest": NewMockMetadataResponse(t).
  523. SetController(seedBroker.BrokerID()).
  524. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  525. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  526. })
  527. config := NewConfig()
  528. config.Version = V0_11_0_0
  529. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. topics, err := admin.DescribeTopics([]string{"my_topic"})
  534. if err != nil {
  535. t.Fatal(err)
  536. }
  537. if len(topics) != 1 {
  538. t.Fatalf("Expected 1 result, got %v", len(topics))
  539. }
  540. if topics[0].Name != "my_topic" {
  541. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  542. }
  543. err = admin.Close()
  544. if err != nil {
  545. t.Fatal(err)
  546. }
  547. }
  548. func TestDescribeConsumerGroup(t *testing.T) {
  549. seedBroker := NewMockBroker(t, 1)
  550. defer seedBroker.Close()
  551. expectedGroupID := "my-group"
  552. seedBroker.SetHandlerByMap(map[string]MockResponse{
  553. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  554. GroupId: expectedGroupID,
  555. }),
  556. "MetadataRequest": NewMockMetadataResponse(t).
  557. SetController(seedBroker.BrokerID()).
  558. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  559. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  560. })
  561. config := NewConfig()
  562. config.Version = V1_0_0_0
  563. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  564. if err != nil {
  565. t.Fatal(err)
  566. }
  567. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  568. if err != nil {
  569. t.Fatal(err)
  570. }
  571. if len(result) != 1 {
  572. t.Fatalf("Expected 1 result, got %v", len(result))
  573. }
  574. if result[0].GroupId != expectedGroupID {
  575. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  576. }
  577. err = admin.Close()
  578. if err != nil {
  579. t.Fatal(err)
  580. }
  581. }
  582. func TestListConsumerGroups(t *testing.T) {
  583. seedBroker := NewMockBroker(t, 1)
  584. defer seedBroker.Close()
  585. seedBroker.SetHandlerByMap(map[string]MockResponse{
  586. "MetadataRequest": NewMockMetadataResponse(t).
  587. SetController(seedBroker.BrokerID()).
  588. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  589. "ListGroupsRequest": NewMockListGroupsResponse(t).
  590. AddGroup("my-group", "consumer"),
  591. })
  592. config := NewConfig()
  593. config.Version = V1_0_0_0
  594. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  595. if err != nil {
  596. t.Fatal(err)
  597. }
  598. groups, err := admin.ListConsumerGroups()
  599. if err != nil {
  600. t.Fatal(err)
  601. }
  602. if len(groups) != 1 {
  603. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  604. }
  605. protocolType, ok := groups["my-group"]
  606. if !ok {
  607. t.Fatal("Expected group to be returned, but it did not")
  608. }
  609. if protocolType != "consumer" {
  610. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  611. }
  612. err = admin.Close()
  613. if err != nil {
  614. t.Fatal(err)
  615. }
  616. }
  617. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  618. seedBroker := NewMockBroker(t, 1)
  619. defer seedBroker.Close()
  620. secondBroker := NewMockBroker(t, 2)
  621. defer secondBroker.Close()
  622. firstGroup := "first"
  623. secondGroup := "second"
  624. nonExistingGroup := "non-existing-group"
  625. seedBroker.SetHandlerByMap(map[string]MockResponse{
  626. "MetadataRequest": NewMockMetadataResponse(t).
  627. SetController(seedBroker.BrokerID()).
  628. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  629. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  630. "ListGroupsRequest": NewMockListGroupsResponse(t).
  631. AddGroup(firstGroup, "consumer"),
  632. })
  633. secondBroker.SetHandlerByMap(map[string]MockResponse{
  634. "MetadataRequest": NewMockMetadataResponse(t).
  635. SetController(seedBroker.BrokerID()).
  636. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  637. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  638. "ListGroupsRequest": NewMockListGroupsResponse(t).
  639. AddGroup(secondGroup, "consumer"),
  640. })
  641. config := NewConfig()
  642. config.Version = V1_0_0_0
  643. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  644. if err != nil {
  645. t.Fatal(err)
  646. }
  647. groups, err := admin.ListConsumerGroups()
  648. if err != nil {
  649. t.Fatal(err)
  650. }
  651. if len(groups) != 2 {
  652. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  653. }
  654. if _, found := groups[firstGroup]; !found {
  655. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  656. }
  657. if _, found := groups[secondGroup]; !found {
  658. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  659. }
  660. if _, found := groups[nonExistingGroup]; found {
  661. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  662. }
  663. err = admin.Close()
  664. if err != nil {
  665. t.Fatal(err)
  666. }
  667. }
  668. func TestListConsumerGroupOffsets(t *testing.T) {
  669. seedBroker := NewMockBroker(t, 1)
  670. defer seedBroker.Close()
  671. group := "my-group"
  672. topic := "my-topic"
  673. partition := int32(0)
  674. expectedOffset := int64(0)
  675. seedBroker.SetHandlerByMap(map[string]MockResponse{
  676. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  677. "MetadataRequest": NewMockMetadataResponse(t).
  678. SetController(seedBroker.BrokerID()).
  679. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  680. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  681. })
  682. config := NewConfig()
  683. config.Version = V1_0_0_0
  684. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  685. if err != nil {
  686. t.Fatal(err)
  687. }
  688. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  689. topic: []int32{0},
  690. })
  691. if err != nil {
  692. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  693. }
  694. block := response.GetBlock(topic, partition)
  695. if block == nil {
  696. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  697. }
  698. if block.Offset != expectedOffset {
  699. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  700. }
  701. err = admin.Close()
  702. if err != nil {
  703. t.Fatal(err)
  704. }
  705. }