admin_test.go 19 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "testing"
  5. )
  6. func TestClusterAdmin(t *testing.T) {
  7. seedBroker := NewMockBroker(t, 1)
  8. defer seedBroker.Close()
  9. seedBroker.SetHandlerByMap(map[string]MockResponse{
  10. "MetadataRequest": NewMockMetadataResponse(t).
  11. SetController(seedBroker.BrokerID()).
  12. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  13. })
  14. config := NewConfig()
  15. config.Version = V1_0_0_0
  16. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. err = admin.Close()
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. }
  25. func TestClusterAdminInvalidController(t *testing.T) {
  26. seedBroker := NewMockBroker(t, 1)
  27. defer seedBroker.Close()
  28. seedBroker.SetHandlerByMap(map[string]MockResponse{
  29. "MetadataRequest": NewMockMetadataResponse(t).
  30. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  31. })
  32. config := NewConfig()
  33. config.Version = V1_0_0_0
  34. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  35. if err == nil {
  36. t.Fatal(errors.New("controller not set still cluster admin was created"))
  37. }
  38. if err != ErrControllerNotAvailable {
  39. t.Fatal(err)
  40. }
  41. }
  42. func TestClusterAdminCreateTopic(t *testing.T) {
  43. seedBroker := NewMockBroker(t, 1)
  44. defer seedBroker.Close()
  45. seedBroker.SetHandlerByMap(map[string]MockResponse{
  46. "MetadataRequest": NewMockMetadataResponse(t).
  47. SetController(seedBroker.BrokerID()).
  48. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  49. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  50. })
  51. config := NewConfig()
  52. config.Version = V0_10_2_0
  53. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. err = admin.Close()
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. }
  66. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  67. seedBroker := NewMockBroker(t, 1)
  68. defer seedBroker.Close()
  69. seedBroker.SetHandlerByMap(map[string]MockResponse{
  70. "MetadataRequest": NewMockMetadataResponse(t).
  71. SetController(seedBroker.BrokerID()).
  72. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  73. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  74. })
  75. config := NewConfig()
  76. config.Version = V0_10_2_0
  77. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. err = admin.CreateTopic("my_topic", nil, false)
  82. if err.Error() != "You must specify topic details" {
  83. t.Fatal(err)
  84. }
  85. err = admin.Close()
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. }
  90. func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
  91. seedBroker := NewMockBroker(t, 1)
  92. defer seedBroker.Close()
  93. seedBroker.SetHandlerByMap(map[string]MockResponse{
  94. "MetadataRequest": NewMockMetadataResponse(t).
  95. SetController(seedBroker.BrokerID()).
  96. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  97. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  98. })
  99. config := NewConfig()
  100. config.Version = V0_11_0_0
  101. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  106. if err != ErrInsufficientData {
  107. t.Fatal(err)
  108. }
  109. err = admin.Close()
  110. if err != nil {
  111. t.Fatal(err)
  112. }
  113. }
  114. func TestClusterAdminListTopics(t *testing.T) {
  115. seedBroker := NewMockBroker(t, 1)
  116. defer seedBroker.Close()
  117. seedBroker.SetHandlerByMap(map[string]MockResponse{
  118. "MetadataRequest": NewMockMetadataResponse(t).
  119. SetController(seedBroker.BrokerID()).
  120. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  121. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  122. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  123. })
  124. config := NewConfig()
  125. config.Version = V1_0_0_0
  126. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. entries, err := admin.ListTopics()
  131. if err != nil {
  132. t.Fatal(err)
  133. }
  134. if len(entries) <= 0 {
  135. t.Fatal(errors.New("no resource present"))
  136. }
  137. topic, found := entries["my_topic"]
  138. if !found {
  139. t.Fatal(errors.New("topic not found in response"))
  140. }
  141. _, found = topic.ConfigEntries["max.message.bytes"]
  142. if found {
  143. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  144. }
  145. value, _ := topic.ConfigEntries["retention.ms"]
  146. if value == nil || *value != "5000" {
  147. t.Fatal(errors.New("non-default topic config entry not found in response"))
  148. }
  149. err = admin.Close()
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  154. t.Fatal(errors.New("replica assignment not found in response"))
  155. }
  156. }
  157. func TestClusterAdminDeleteTopic(t *testing.T) {
  158. seedBroker := NewMockBroker(t, 1)
  159. defer seedBroker.Close()
  160. seedBroker.SetHandlerByMap(map[string]MockResponse{
  161. "MetadataRequest": NewMockMetadataResponse(t).
  162. SetController(seedBroker.BrokerID()).
  163. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  164. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  165. })
  166. config := NewConfig()
  167. config.Version = V0_10_2_0
  168. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  169. if err != nil {
  170. t.Fatal(err)
  171. }
  172. err = admin.DeleteTopic("my_topic")
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. err = admin.Close()
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. }
  181. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  182. seedBroker := NewMockBroker(t, 1)
  183. defer seedBroker.Close()
  184. seedBroker.SetHandlerByMap(map[string]MockResponse{
  185. "MetadataRequest": NewMockMetadataResponse(t).
  186. SetController(seedBroker.BrokerID()).
  187. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  188. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  189. })
  190. config := NewConfig()
  191. config.Version = V0_10_2_0
  192. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. err = admin.DeleteTopic("")
  197. if err != ErrInvalidTopic {
  198. t.Fatal(err)
  199. }
  200. err = admin.Close()
  201. if err != nil {
  202. t.Fatal(err)
  203. }
  204. }
  205. func TestClusterAdminCreatePartitions(t *testing.T) {
  206. seedBroker := NewMockBroker(t, 1)
  207. defer seedBroker.Close()
  208. seedBroker.SetHandlerByMap(map[string]MockResponse{
  209. "MetadataRequest": NewMockMetadataResponse(t).
  210. SetController(seedBroker.BrokerID()).
  211. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  212. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  213. })
  214. config := NewConfig()
  215. config.Version = V1_0_0_0
  216. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. err = admin.CreatePartitions("my_topic", 3, nil, false)
  221. if err != nil {
  222. t.Fatal(err)
  223. }
  224. err = admin.Close()
  225. if err != nil {
  226. t.Fatal(err)
  227. }
  228. }
  229. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  230. seedBroker := NewMockBroker(t, 1)
  231. defer seedBroker.Close()
  232. seedBroker.SetHandlerByMap(map[string]MockResponse{
  233. "MetadataRequest": NewMockMetadataResponse(t).
  234. SetController(seedBroker.BrokerID()).
  235. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  236. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  237. })
  238. config := NewConfig()
  239. config.Version = V0_10_2_0
  240. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. err = admin.CreatePartitions("my_topic", 3, nil, false)
  245. if err != ErrUnsupportedVersion {
  246. t.Fatal(err)
  247. }
  248. err = admin.Close()
  249. if err != nil {
  250. t.Fatal(err)
  251. }
  252. }
  253. func TestClusterAdminDeleteRecords(t *testing.T) {
  254. seedBroker := NewMockBroker(t, 1)
  255. defer seedBroker.Close()
  256. seedBroker.SetHandlerByMap(map[string]MockResponse{
  257. "MetadataRequest": NewMockMetadataResponse(t).
  258. SetController(seedBroker.BrokerID()).
  259. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  260. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  261. })
  262. config := NewConfig()
  263. config.Version = V1_0_0_0
  264. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  265. if err != nil {
  266. t.Fatal(err)
  267. }
  268. partitionOffset := make(map[int32]int64)
  269. partitionOffset[1] = 1000
  270. partitionOffset[2] = 1000
  271. partitionOffset[3] = 1000
  272. err = admin.DeleteRecords("my_topic", partitionOffset)
  273. if err != nil {
  274. t.Fatal(err)
  275. }
  276. err = admin.Close()
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. }
  281. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  282. seedBroker := NewMockBroker(t, 1)
  283. defer seedBroker.Close()
  284. seedBroker.SetHandlerByMap(map[string]MockResponse{
  285. "MetadataRequest": NewMockMetadataResponse(t).
  286. SetController(seedBroker.BrokerID()).
  287. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  288. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  289. })
  290. config := NewConfig()
  291. config.Version = V0_10_2_0
  292. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  293. if err != nil {
  294. t.Fatal(err)
  295. }
  296. partitionOffset := make(map[int32]int64)
  297. partitionOffset[1] = 1000
  298. partitionOffset[2] = 1000
  299. partitionOffset[3] = 1000
  300. err = admin.DeleteRecords("my_topic", partitionOffset)
  301. if err != ErrUnsupportedVersion {
  302. t.Fatal(err)
  303. }
  304. err = admin.Close()
  305. if err != nil {
  306. t.Fatal(err)
  307. }
  308. }
  309. func TestClusterAdminDescribeConfig(t *testing.T) {
  310. seedBroker := NewMockBroker(t, 1)
  311. defer seedBroker.Close()
  312. seedBroker.SetHandlerByMap(map[string]MockResponse{
  313. "MetadataRequest": NewMockMetadataResponse(t).
  314. SetController(seedBroker.BrokerID()).
  315. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  316. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  317. })
  318. config := NewConfig()
  319. config.Version = V1_0_0_0
  320. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  325. entries, err := admin.DescribeConfig(resource)
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. if len(entries) <= 0 {
  330. t.Fatal(errors.New("no resource present"))
  331. }
  332. err = admin.Close()
  333. if err != nil {
  334. t.Fatal(err)
  335. }
  336. }
  337. func TestClusterAdminAlterConfig(t *testing.T) {
  338. seedBroker := NewMockBroker(t, 1)
  339. defer seedBroker.Close()
  340. seedBroker.SetHandlerByMap(map[string]MockResponse{
  341. "MetadataRequest": NewMockMetadataResponse(t).
  342. SetController(seedBroker.BrokerID()).
  343. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  344. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  345. })
  346. config := NewConfig()
  347. config.Version = V1_0_0_0
  348. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  349. if err != nil {
  350. t.Fatal(err)
  351. }
  352. var value string
  353. entries := make(map[string]*string)
  354. value = "3"
  355. entries["ReplicationFactor"] = &value
  356. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. err = admin.Close()
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. }
  365. func TestClusterAdminCreateAcl(t *testing.T) {
  366. seedBroker := NewMockBroker(t, 1)
  367. defer seedBroker.Close()
  368. seedBroker.SetHandlerByMap(map[string]MockResponse{
  369. "MetadataRequest": NewMockMetadataResponse(t).
  370. SetController(seedBroker.BrokerID()).
  371. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  372. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  373. })
  374. config := NewConfig()
  375. config.Version = V1_0_0_0
  376. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  381. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  382. err = admin.CreateACL(r, a)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. err = admin.Close()
  387. if err != nil {
  388. t.Fatal(err)
  389. }
  390. }
  391. func TestClusterAdminListAcls(t *testing.T) {
  392. seedBroker := NewMockBroker(t, 1)
  393. defer seedBroker.Close()
  394. seedBroker.SetHandlerByMap(map[string]MockResponse{
  395. "MetadataRequest": NewMockMetadataResponse(t).
  396. SetController(seedBroker.BrokerID()).
  397. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  398. "DescribeAclsRequest": NewMockListAclsResponse(t),
  399. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  400. })
  401. config := NewConfig()
  402. config.Version = V1_0_0_0
  403. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  408. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  409. err = admin.CreateACL(r, a)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. resourceName := "my_topic"
  414. filter := AclFilter{
  415. ResourceType: AclResourceTopic,
  416. Operation: AclOperationRead,
  417. ResourceName: &resourceName,
  418. }
  419. rAcls, err := admin.ListAcls(filter)
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. if len(rAcls) <= 0 {
  424. t.Fatal("no acls present")
  425. }
  426. err = admin.Close()
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. }
  431. func TestClusterAdminDeleteAcl(t *testing.T) {
  432. seedBroker := NewMockBroker(t, 1)
  433. defer seedBroker.Close()
  434. seedBroker.SetHandlerByMap(map[string]MockResponse{
  435. "MetadataRequest": NewMockMetadataResponse(t).
  436. SetController(seedBroker.BrokerID()).
  437. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  438. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  439. })
  440. config := NewConfig()
  441. config.Version = V1_0_0_0
  442. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  443. if err != nil {
  444. t.Fatal(err)
  445. }
  446. resourceName := "my_topic"
  447. filter := AclFilter{
  448. ResourceType: AclResourceTopic,
  449. Operation: AclOperationAlter,
  450. ResourceName: &resourceName,
  451. }
  452. _, err = admin.DeleteACL(filter, false)
  453. if err != nil {
  454. t.Fatal(err)
  455. }
  456. err = admin.Close()
  457. if err != nil {
  458. t.Fatal(err)
  459. }
  460. }
  461. func TestDescribeTopic(t *testing.T) {
  462. seedBroker := NewMockBroker(t, 1)
  463. defer seedBroker.Close()
  464. seedBroker.SetHandlerByMap(map[string]MockResponse{
  465. "MetadataRequest": NewMockMetadataResponse(t).
  466. SetController(seedBroker.BrokerID()).
  467. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  468. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  469. })
  470. config := NewConfig()
  471. config.Version = V1_0_0_0
  472. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. topics, err := admin.DescribeTopics([]string{"my_topic"})
  477. if err != nil {
  478. t.Fatal(err)
  479. }
  480. if len(topics) != 1 {
  481. t.Fatalf("Expected 1 result, got %v", len(topics))
  482. }
  483. if topics[0].Name != "my_topic" {
  484. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  485. }
  486. err = admin.Close()
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. }
  491. func TestDescribeConsumerGroup(t *testing.T) {
  492. seedBroker := NewMockBroker(t, 1)
  493. defer seedBroker.Close()
  494. expectedGroupID := "my-group"
  495. seedBroker.SetHandlerByMap(map[string]MockResponse{
  496. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  497. GroupId: expectedGroupID,
  498. }),
  499. "MetadataRequest": NewMockMetadataResponse(t).
  500. SetController(seedBroker.BrokerID()).
  501. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  502. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  503. })
  504. config := NewConfig()
  505. config.Version = V1_0_0_0
  506. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  507. if err != nil {
  508. t.Fatal(err)
  509. }
  510. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  511. if err != nil {
  512. t.Fatal(err)
  513. }
  514. if len(result) != 1 {
  515. t.Fatalf("Expected 1 result, got %v", len(result))
  516. }
  517. if result[0].GroupId != expectedGroupID {
  518. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  519. }
  520. err = admin.Close()
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. }
  525. func TestListConsumerGroups(t *testing.T) {
  526. seedBroker := NewMockBroker(t, 1)
  527. defer seedBroker.Close()
  528. seedBroker.SetHandlerByMap(map[string]MockResponse{
  529. "MetadataRequest": NewMockMetadataResponse(t).
  530. SetController(seedBroker.BrokerID()).
  531. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  532. "ListGroupsRequest": NewMockListGroupsResponse(t).
  533. AddGroup("my-group", "consumer"),
  534. })
  535. config := NewConfig()
  536. config.Version = V1_0_0_0
  537. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. groups, err := admin.ListConsumerGroups()
  542. if err != nil {
  543. t.Fatal(err)
  544. }
  545. if len(groups) != 1 {
  546. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  547. }
  548. protocolType, ok := groups["my-group"]
  549. if !ok {
  550. t.Fatal("Expected group to be returned, but it did not")
  551. }
  552. if protocolType != "consumer" {
  553. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  554. }
  555. err = admin.Close()
  556. if err != nil {
  557. t.Fatal(err)
  558. }
  559. }
  560. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  561. seedBroker := NewMockBroker(t, 1)
  562. defer seedBroker.Close()
  563. secondBroker := NewMockBroker(t, 2)
  564. defer secondBroker.Close()
  565. firstGroup := "first"
  566. secondGroup := "second"
  567. nonExistingGroup := "non-existing-group"
  568. seedBroker.SetHandlerByMap(map[string]MockResponse{
  569. "MetadataRequest": NewMockMetadataResponse(t).
  570. SetController(seedBroker.BrokerID()).
  571. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  572. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  573. "ListGroupsRequest": NewMockListGroupsResponse(t).
  574. AddGroup(firstGroup, "consumer"),
  575. })
  576. secondBroker.SetHandlerByMap(map[string]MockResponse{
  577. "MetadataRequest": NewMockMetadataResponse(t).
  578. SetController(seedBroker.BrokerID()).
  579. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  580. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  581. "ListGroupsRequest": NewMockListGroupsResponse(t).
  582. AddGroup(secondGroup, "consumer"),
  583. })
  584. config := NewConfig()
  585. config.Version = V1_0_0_0
  586. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  587. if err != nil {
  588. t.Fatal(err)
  589. }
  590. groups, err := admin.ListConsumerGroups()
  591. if err != nil {
  592. t.Fatal(err)
  593. }
  594. if len(groups) != 2 {
  595. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  596. }
  597. if _, found := groups[firstGroup]; !found {
  598. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  599. }
  600. if _, found := groups[secondGroup]; !found {
  601. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  602. }
  603. if _, found := groups[nonExistingGroup]; found {
  604. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  605. }
  606. err = admin.Close()
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. }
  611. func TestListConsumerGroupOffsets(t *testing.T) {
  612. seedBroker := NewMockBroker(t, 1)
  613. defer seedBroker.Close()
  614. group := "my-group"
  615. topic := "my-topic"
  616. partition := int32(0)
  617. expectedOffset := int64(0)
  618. seedBroker.SetHandlerByMap(map[string]MockResponse{
  619. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  620. "MetadataRequest": NewMockMetadataResponse(t).
  621. SetController(seedBroker.BrokerID()).
  622. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  623. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  624. })
  625. config := NewConfig()
  626. config.Version = V1_0_0_0
  627. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  628. if err != nil {
  629. t.Fatal(err)
  630. }
  631. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  632. topic: []int32{0},
  633. })
  634. if err != nil {
  635. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  636. }
  637. block := response.GetBlock(topic, partition)
  638. if block == nil {
  639. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  640. }
  641. if block.Offset != expectedOffset {
  642. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  643. }
  644. err = admin.Close()
  645. if err != nil {
  646. t.Fatal(err)
  647. }
  648. }