admin_test.go 20 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. )
  7. func TestClusterAdmin(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. defer seedBroker.Close()
  10. seedBroker.SetHandlerByMap(map[string]MockResponse{
  11. "MetadataRequest": NewMockMetadataResponse(t).
  12. SetController(seedBroker.BrokerID()).
  13. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  14. })
  15. config := NewConfig()
  16. config.Version = V1_0_0_0
  17. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. err = admin.Close()
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. }
  26. func TestClusterAdminInvalidController(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. defer seedBroker.Close()
  29. seedBroker.SetHandlerByMap(map[string]MockResponse{
  30. "MetadataRequest": NewMockMetadataResponse(t).
  31. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  32. })
  33. config := NewConfig()
  34. config.Version = V1_0_0_0
  35. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  36. if err == nil {
  37. t.Fatal(errors.New("controller not set still cluster admin was created"))
  38. }
  39. if err != ErrControllerNotAvailable {
  40. t.Fatal(err)
  41. }
  42. }
  43. func TestClusterAdminCreateTopic(t *testing.T) {
  44. seedBroker := NewMockBroker(t, 1)
  45. defer seedBroker.Close()
  46. seedBroker.SetHandlerByMap(map[string]MockResponse{
  47. "MetadataRequest": NewMockMetadataResponse(t).
  48. SetController(seedBroker.BrokerID()).
  49. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  50. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  51. })
  52. config := NewConfig()
  53. config.Version = V0_10_2_0
  54. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.Close()
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  68. seedBroker := NewMockBroker(t, 1)
  69. defer seedBroker.Close()
  70. seedBroker.SetHandlerByMap(map[string]MockResponse{
  71. "MetadataRequest": NewMockMetadataResponse(t).
  72. SetController(seedBroker.BrokerID()).
  73. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  74. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  75. })
  76. config := NewConfig()
  77. config.Version = V0_10_2_0
  78. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. err = admin.CreateTopic("my_topic", nil, false)
  83. if err.Error() != "you must specify topic details" {
  84. t.Fatal(err)
  85. }
  86. err = admin.Close()
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. }
  91. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  92. seedBroker := NewMockBroker(t, 1)
  93. defer seedBroker.Close()
  94. seedBroker.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetController(seedBroker.BrokerID()).
  97. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  98. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  99. })
  100. config := NewConfig()
  101. config.Version = V0_11_0_0
  102. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  107. want := "insufficient permissions to create topic with reserved prefix"
  108. if !strings.HasSuffix(err.Error(), want) {
  109. t.Fatal(err)
  110. }
  111. err = admin.Close()
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. func TestClusterAdminListTopics(t *testing.T) {
  117. seedBroker := NewMockBroker(t, 1)
  118. defer seedBroker.Close()
  119. seedBroker.SetHandlerByMap(map[string]MockResponse{
  120. "MetadataRequest": NewMockMetadataResponse(t).
  121. SetController(seedBroker.BrokerID()).
  122. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  123. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  124. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  125. })
  126. config := NewConfig()
  127. config.Version = V1_0_0_0
  128. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := admin.ListTopics()
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) <= 0 {
  137. t.Fatal(errors.New("no resource present"))
  138. }
  139. topic, found := entries["my_topic"]
  140. if !found {
  141. t.Fatal(errors.New("topic not found in response"))
  142. }
  143. _, found = topic.ConfigEntries["max.message.bytes"]
  144. if found {
  145. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  146. }
  147. value, _ := topic.ConfigEntries["retention.ms"]
  148. if value == nil || *value != "5000" {
  149. t.Fatal(errors.New("non-default topic config entry not found in response"))
  150. }
  151. err = admin.Close()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  156. t.Fatal(errors.New("replica assignment not found in response"))
  157. }
  158. }
  159. func TestClusterAdminDeleteTopic(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. defer seedBroker.Close()
  162. seedBroker.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetController(seedBroker.BrokerID()).
  165. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  166. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  167. })
  168. config := NewConfig()
  169. config.Version = V0_10_2_0
  170. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. err = admin.DeleteTopic("my_topic")
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.Close()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. }
  183. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  184. seedBroker := NewMockBroker(t, 1)
  185. defer seedBroker.Close()
  186. seedBroker.SetHandlerByMap(map[string]MockResponse{
  187. "MetadataRequest": NewMockMetadataResponse(t).
  188. SetController(seedBroker.BrokerID()).
  189. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  190. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  191. })
  192. config := NewConfig()
  193. config.Version = V0_10_2_0
  194. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. err = admin.DeleteTopic("")
  199. if err != ErrInvalidTopic {
  200. t.Fatal(err)
  201. }
  202. err = admin.Close()
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. }
  207. func TestClusterAdminCreatePartitions(t *testing.T) {
  208. seedBroker := NewMockBroker(t, 1)
  209. defer seedBroker.Close()
  210. seedBroker.SetHandlerByMap(map[string]MockResponse{
  211. "MetadataRequest": NewMockMetadataResponse(t).
  212. SetController(seedBroker.BrokerID()).
  213. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  214. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  215. })
  216. config := NewConfig()
  217. config.Version = V1_0_0_0
  218. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. err = admin.CreatePartitions("my_topic", 3, nil, false)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.Close()
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. }
  231. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  232. seedBroker := NewMockBroker(t, 1)
  233. defer seedBroker.Close()
  234. seedBroker.SetHandlerByMap(map[string]MockResponse{
  235. "MetadataRequest": NewMockMetadataResponse(t).
  236. SetController(seedBroker.BrokerID()).
  237. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  238. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  239. })
  240. config := NewConfig()
  241. config.Version = V0_10_2_0
  242. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. err = admin.CreatePartitions("my_topic", 3, nil, false)
  247. if err != ErrUnsupportedVersion {
  248. t.Fatal(err)
  249. }
  250. err = admin.Close()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  256. seedBroker := NewMockBroker(t, 1)
  257. defer seedBroker.Close()
  258. seedBroker.SetHandlerByMap(map[string]MockResponse{
  259. "MetadataRequest": NewMockMetadataResponse(t).
  260. SetController(seedBroker.BrokerID()).
  261. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  262. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  263. })
  264. config := NewConfig()
  265. config.Version = V1_0_0_0
  266. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  271. want := "insufficient permissions to create partition on topic with reserved prefix"
  272. if !strings.HasSuffix(err.Error(), want) {
  273. t.Fatal(err)
  274. }
  275. err = admin.Close()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. }
  280. func TestClusterAdminDeleteRecords(t *testing.T) {
  281. seedBroker := NewMockBroker(t, 1)
  282. defer seedBroker.Close()
  283. seedBroker.SetHandlerByMap(map[string]MockResponse{
  284. "MetadataRequest": NewMockMetadataResponse(t).
  285. SetController(seedBroker.BrokerID()).
  286. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  287. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  288. })
  289. config := NewConfig()
  290. config.Version = V1_0_0_0
  291. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  292. if err != nil {
  293. t.Fatal(err)
  294. }
  295. partitionOffset := make(map[int32]int64)
  296. partitionOffset[1] = 1000
  297. partitionOffset[2] = 1000
  298. partitionOffset[3] = 1000
  299. err = admin.DeleteRecords("my_topic", partitionOffset)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. err = admin.Close()
  304. if err != nil {
  305. t.Fatal(err)
  306. }
  307. }
  308. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  309. seedBroker := NewMockBroker(t, 1)
  310. defer seedBroker.Close()
  311. seedBroker.SetHandlerByMap(map[string]MockResponse{
  312. "MetadataRequest": NewMockMetadataResponse(t).
  313. SetController(seedBroker.BrokerID()).
  314. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  315. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  316. })
  317. config := NewConfig()
  318. config.Version = V0_10_2_0
  319. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  320. if err != nil {
  321. t.Fatal(err)
  322. }
  323. partitionOffset := make(map[int32]int64)
  324. partitionOffset[1] = 1000
  325. partitionOffset[2] = 1000
  326. partitionOffset[3] = 1000
  327. err = admin.DeleteRecords("my_topic", partitionOffset)
  328. if err != ErrUnsupportedVersion {
  329. t.Fatal(err)
  330. }
  331. err = admin.Close()
  332. if err != nil {
  333. t.Fatal(err)
  334. }
  335. }
  336. func TestClusterAdminDescribeConfig(t *testing.T) {
  337. seedBroker := NewMockBroker(t, 1)
  338. defer seedBroker.Close()
  339. seedBroker.SetHandlerByMap(map[string]MockResponse{
  340. "MetadataRequest": NewMockMetadataResponse(t).
  341. SetController(seedBroker.BrokerID()).
  342. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  343. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  344. })
  345. config := NewConfig()
  346. config.Version = V1_0_0_0
  347. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  352. entries, err := admin.DescribeConfig(resource)
  353. if err != nil {
  354. t.Fatal(err)
  355. }
  356. if len(entries) <= 0 {
  357. t.Fatal(errors.New("no resource present"))
  358. }
  359. err = admin.Close()
  360. if err != nil {
  361. t.Fatal(err)
  362. }
  363. }
  364. func TestClusterAdminAlterConfig(t *testing.T) {
  365. seedBroker := NewMockBroker(t, 1)
  366. defer seedBroker.Close()
  367. seedBroker.SetHandlerByMap(map[string]MockResponse{
  368. "MetadataRequest": NewMockMetadataResponse(t).
  369. SetController(seedBroker.BrokerID()).
  370. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  371. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  372. })
  373. config := NewConfig()
  374. config.Version = V1_0_0_0
  375. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  376. if err != nil {
  377. t.Fatal(err)
  378. }
  379. var value string
  380. entries := make(map[string]*string)
  381. value = "3"
  382. entries["ReplicationFactor"] = &value
  383. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. err = admin.Close()
  388. if err != nil {
  389. t.Fatal(err)
  390. }
  391. }
  392. func TestClusterAdminCreateAcl(t *testing.T) {
  393. seedBroker := NewMockBroker(t, 1)
  394. defer seedBroker.Close()
  395. seedBroker.SetHandlerByMap(map[string]MockResponse{
  396. "MetadataRequest": NewMockMetadataResponse(t).
  397. SetController(seedBroker.BrokerID()).
  398. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  399. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  400. })
  401. config := NewConfig()
  402. config.Version = V1_0_0_0
  403. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  408. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  409. err = admin.CreateACL(r, a)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. err = admin.Close()
  414. if err != nil {
  415. t.Fatal(err)
  416. }
  417. }
  418. func TestClusterAdminListAcls(t *testing.T) {
  419. seedBroker := NewMockBroker(t, 1)
  420. defer seedBroker.Close()
  421. seedBroker.SetHandlerByMap(map[string]MockResponse{
  422. "MetadataRequest": NewMockMetadataResponse(t).
  423. SetController(seedBroker.BrokerID()).
  424. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  425. "DescribeAclsRequest": NewMockListAclsResponse(t),
  426. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  427. })
  428. config := NewConfig()
  429. config.Version = V1_0_0_0
  430. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  431. if err != nil {
  432. t.Fatal(err)
  433. }
  434. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  435. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  436. err = admin.CreateACL(r, a)
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. resourceName := "my_topic"
  441. filter := AclFilter{
  442. ResourceType: AclResourceTopic,
  443. Operation: AclOperationRead,
  444. ResourceName: &resourceName,
  445. }
  446. rAcls, err := admin.ListAcls(filter)
  447. if err != nil {
  448. t.Fatal(err)
  449. }
  450. if len(rAcls) <= 0 {
  451. t.Fatal("no acls present")
  452. }
  453. err = admin.Close()
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. }
  458. func TestClusterAdminDeleteAcl(t *testing.T) {
  459. seedBroker := NewMockBroker(t, 1)
  460. defer seedBroker.Close()
  461. seedBroker.SetHandlerByMap(map[string]MockResponse{
  462. "MetadataRequest": NewMockMetadataResponse(t).
  463. SetController(seedBroker.BrokerID()).
  464. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  465. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  466. })
  467. config := NewConfig()
  468. config.Version = V1_0_0_0
  469. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. resourceName := "my_topic"
  474. filter := AclFilter{
  475. ResourceType: AclResourceTopic,
  476. Operation: AclOperationAlter,
  477. ResourceName: &resourceName,
  478. }
  479. _, err = admin.DeleteACL(filter, false)
  480. if err != nil {
  481. t.Fatal(err)
  482. }
  483. err = admin.Close()
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. }
  488. func TestDescribeTopic(t *testing.T) {
  489. seedBroker := NewMockBroker(t, 1)
  490. defer seedBroker.Close()
  491. seedBroker.SetHandlerByMap(map[string]MockResponse{
  492. "MetadataRequest": NewMockMetadataResponse(t).
  493. SetController(seedBroker.BrokerID()).
  494. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  495. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  496. })
  497. config := NewConfig()
  498. config.Version = V1_0_0_0
  499. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  500. if err != nil {
  501. t.Fatal(err)
  502. }
  503. topics, err := admin.DescribeTopics([]string{"my_topic"})
  504. if err != nil {
  505. t.Fatal(err)
  506. }
  507. if len(topics) != 1 {
  508. t.Fatalf("Expected 1 result, got %v", len(topics))
  509. }
  510. if topics[0].Name != "my_topic" {
  511. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  512. }
  513. err = admin.Close()
  514. if err != nil {
  515. t.Fatal(err)
  516. }
  517. }
  518. func TestDescribeConsumerGroup(t *testing.T) {
  519. seedBroker := NewMockBroker(t, 1)
  520. defer seedBroker.Close()
  521. expectedGroupID := "my-group"
  522. seedBroker.SetHandlerByMap(map[string]MockResponse{
  523. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  524. GroupId: expectedGroupID,
  525. }),
  526. "MetadataRequest": NewMockMetadataResponse(t).
  527. SetController(seedBroker.BrokerID()).
  528. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  529. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  530. })
  531. config := NewConfig()
  532. config.Version = V1_0_0_0
  533. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  534. if err != nil {
  535. t.Fatal(err)
  536. }
  537. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. if len(result) != 1 {
  542. t.Fatalf("Expected 1 result, got %v", len(result))
  543. }
  544. if result[0].GroupId != expectedGroupID {
  545. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  546. }
  547. err = admin.Close()
  548. if err != nil {
  549. t.Fatal(err)
  550. }
  551. }
  552. func TestListConsumerGroups(t *testing.T) {
  553. seedBroker := NewMockBroker(t, 1)
  554. defer seedBroker.Close()
  555. seedBroker.SetHandlerByMap(map[string]MockResponse{
  556. "MetadataRequest": NewMockMetadataResponse(t).
  557. SetController(seedBroker.BrokerID()).
  558. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  559. "ListGroupsRequest": NewMockListGroupsResponse(t).
  560. AddGroup("my-group", "consumer"),
  561. })
  562. config := NewConfig()
  563. config.Version = V1_0_0_0
  564. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  565. if err != nil {
  566. t.Fatal(err)
  567. }
  568. groups, err := admin.ListConsumerGroups()
  569. if err != nil {
  570. t.Fatal(err)
  571. }
  572. if len(groups) != 1 {
  573. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  574. }
  575. protocolType, ok := groups["my-group"]
  576. if !ok {
  577. t.Fatal("Expected group to be returned, but it did not")
  578. }
  579. if protocolType != "consumer" {
  580. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  581. }
  582. err = admin.Close()
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. }
  587. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  588. seedBroker := NewMockBroker(t, 1)
  589. defer seedBroker.Close()
  590. secondBroker := NewMockBroker(t, 2)
  591. defer secondBroker.Close()
  592. firstGroup := "first"
  593. secondGroup := "second"
  594. nonExistingGroup := "non-existing-group"
  595. seedBroker.SetHandlerByMap(map[string]MockResponse{
  596. "MetadataRequest": NewMockMetadataResponse(t).
  597. SetController(seedBroker.BrokerID()).
  598. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  599. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  600. "ListGroupsRequest": NewMockListGroupsResponse(t).
  601. AddGroup(firstGroup, "consumer"),
  602. })
  603. secondBroker.SetHandlerByMap(map[string]MockResponse{
  604. "MetadataRequest": NewMockMetadataResponse(t).
  605. SetController(seedBroker.BrokerID()).
  606. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  607. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  608. "ListGroupsRequest": NewMockListGroupsResponse(t).
  609. AddGroup(secondGroup, "consumer"),
  610. })
  611. config := NewConfig()
  612. config.Version = V1_0_0_0
  613. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  614. if err != nil {
  615. t.Fatal(err)
  616. }
  617. groups, err := admin.ListConsumerGroups()
  618. if err != nil {
  619. t.Fatal(err)
  620. }
  621. if len(groups) != 2 {
  622. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  623. }
  624. if _, found := groups[firstGroup]; !found {
  625. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  626. }
  627. if _, found := groups[secondGroup]; !found {
  628. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  629. }
  630. if _, found := groups[nonExistingGroup]; found {
  631. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  632. }
  633. err = admin.Close()
  634. if err != nil {
  635. t.Fatal(err)
  636. }
  637. }
  638. func TestListConsumerGroupOffsets(t *testing.T) {
  639. seedBroker := NewMockBroker(t, 1)
  640. defer seedBroker.Close()
  641. group := "my-group"
  642. topic := "my-topic"
  643. partition := int32(0)
  644. expectedOffset := int64(0)
  645. seedBroker.SetHandlerByMap(map[string]MockResponse{
  646. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  647. "MetadataRequest": NewMockMetadataResponse(t).
  648. SetController(seedBroker.BrokerID()).
  649. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  650. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  651. })
  652. config := NewConfig()
  653. config.Version = V1_0_0_0
  654. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  655. if err != nil {
  656. t.Fatal(err)
  657. }
  658. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  659. topic: []int32{0},
  660. })
  661. if err != nil {
  662. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  663. }
  664. block := response.GetBlock(topic, partition)
  665. if block == nil {
  666. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  667. }
  668. if block.Offset != expectedOffset {
  669. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  670. }
  671. err = admin.Close()
  672. if err != nil {
  673. t.Fatal(err)
  674. }
  675. }