admin_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. package sarama
  2. import (
  3. "errors"
  4. "testing"
  5. )
  6. func TestClusterAdmin(t *testing.T) {
  7. seedBroker := NewMockBroker(t, 1)
  8. defer seedBroker.Close()
  9. seedBroker.SetHandlerByMap(map[string]MockResponse{
  10. "MetadataRequest": NewMockMetadataResponse(t).
  11. SetController(seedBroker.BrokerID()).
  12. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  13. })
  14. config := NewConfig()
  15. config.Version = V1_0_0_0
  16. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. err = admin.Close()
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. }
  25. func TestClusterAdminInvalidController(t *testing.T) {
  26. seedBroker := NewMockBroker(t, 1)
  27. defer seedBroker.Close()
  28. seedBroker.SetHandlerByMap(map[string]MockResponse{
  29. "MetadataRequest": NewMockMetadataResponse(t).
  30. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  31. })
  32. config := NewConfig()
  33. config.Version = V1_0_0_0
  34. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  35. if err == nil {
  36. t.Fatal(errors.New("controller not set still cluster admin was created"))
  37. }
  38. if err != ErrControllerNotAvailable {
  39. t.Fatal(err)
  40. }
  41. }
  42. func TestClusterAdminCreateTopic(t *testing.T) {
  43. seedBroker := NewMockBroker(t, 1)
  44. defer seedBroker.Close()
  45. seedBroker.SetHandlerByMap(map[string]MockResponse{
  46. "MetadataRequest": NewMockMetadataResponse(t).
  47. SetController(seedBroker.BrokerID()).
  48. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  49. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  50. })
  51. config := NewConfig()
  52. config.Version = V0_10_2_0
  53. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. err = admin.Close()
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. }
  66. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  67. seedBroker := NewMockBroker(t, 1)
  68. defer seedBroker.Close()
  69. seedBroker.SetHandlerByMap(map[string]MockResponse{
  70. "MetadataRequest": NewMockMetadataResponse(t).
  71. SetController(seedBroker.BrokerID()).
  72. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  73. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  74. })
  75. config := NewConfig()
  76. config.Version = V0_10_2_0
  77. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. err = admin.CreateTopic("my_topic", nil, false)
  82. if err.Error() != "You must specify topic details" {
  83. t.Fatal(err)
  84. }
  85. err = admin.Close()
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. }
  90. func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
  91. seedBroker := NewMockBroker(t, 1)
  92. defer seedBroker.Close()
  93. seedBroker.SetHandlerByMap(map[string]MockResponse{
  94. "MetadataRequest": NewMockMetadataResponse(t).
  95. SetController(seedBroker.BrokerID()).
  96. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  97. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  98. })
  99. config := NewConfig()
  100. config.Version = V0_11_0_0
  101. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  106. if err != ErrInsufficientData {
  107. t.Fatal(err)
  108. }
  109. err = admin.Close()
  110. if err != nil {
  111. t.Fatal(err)
  112. }
  113. }
  114. func TestClusterAdminListTopics(t *testing.T) {
  115. seedBroker := NewMockBroker(t, 1)
  116. defer seedBroker.Close()
  117. seedBroker.SetHandlerByMap(map[string]MockResponse{
  118. "MetadataRequest": NewMockMetadataResponse(t).
  119. SetController(seedBroker.BrokerID()).
  120. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  121. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  122. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  123. })
  124. config := NewConfig()
  125. config.Version = V1_0_0_0
  126. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  127. if err != nil {
  128. t.Fatal(err)
  129. }
  130. entries, err := admin.ListTopics()
  131. if err != nil {
  132. t.Fatal(err)
  133. }
  134. if len(entries) <= 0 {
  135. t.Fatal(errors.New("no resource present"))
  136. }
  137. topic, found := entries["my_topic"]
  138. if !found {
  139. t.Fatal(errors.New("topic not found in response"))
  140. }
  141. _, found = topic.ConfigEntries["max.message.bytes"]
  142. if found {
  143. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  144. }
  145. value, _ := topic.ConfigEntries["retention.ms"]
  146. if value == nil || *value != "5000" {
  147. t.Fatal(errors.New("non-default topic config entry not found in response"))
  148. }
  149. err = admin.Close()
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. }
  154. func TestClusterAdminDeleteTopic(t *testing.T) {
  155. seedBroker := NewMockBroker(t, 1)
  156. defer seedBroker.Close()
  157. seedBroker.SetHandlerByMap(map[string]MockResponse{
  158. "MetadataRequest": NewMockMetadataResponse(t).
  159. SetController(seedBroker.BrokerID()).
  160. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  161. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  162. })
  163. config := NewConfig()
  164. config.Version = V0_10_2_0
  165. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. err = admin.DeleteTopic("my_topic")
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. err = admin.Close()
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. }
  178. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  179. seedBroker := NewMockBroker(t, 1)
  180. defer seedBroker.Close()
  181. seedBroker.SetHandlerByMap(map[string]MockResponse{
  182. "MetadataRequest": NewMockMetadataResponse(t).
  183. SetController(seedBroker.BrokerID()).
  184. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  185. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  186. })
  187. config := NewConfig()
  188. config.Version = V0_10_2_0
  189. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  190. if err != nil {
  191. t.Fatal(err)
  192. }
  193. err = admin.DeleteTopic("")
  194. if err != ErrInvalidTopic {
  195. t.Fatal(err)
  196. }
  197. err = admin.Close()
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. }
  202. func TestClusterAdminCreatePartitions(t *testing.T) {
  203. seedBroker := NewMockBroker(t, 1)
  204. defer seedBroker.Close()
  205. seedBroker.SetHandlerByMap(map[string]MockResponse{
  206. "MetadataRequest": NewMockMetadataResponse(t).
  207. SetController(seedBroker.BrokerID()).
  208. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  209. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  210. })
  211. config := NewConfig()
  212. config.Version = V1_0_0_0
  213. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  214. if err != nil {
  215. t.Fatal(err)
  216. }
  217. err = admin.CreatePartitions("my_topic", 3, nil, false)
  218. if err != nil {
  219. t.Fatal(err)
  220. }
  221. err = admin.Close()
  222. if err != nil {
  223. t.Fatal(err)
  224. }
  225. }
  226. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  227. seedBroker := NewMockBroker(t, 1)
  228. defer seedBroker.Close()
  229. seedBroker.SetHandlerByMap(map[string]MockResponse{
  230. "MetadataRequest": NewMockMetadataResponse(t).
  231. SetController(seedBroker.BrokerID()).
  232. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  233. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  234. })
  235. config := NewConfig()
  236. config.Version = V0_10_2_0
  237. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. err = admin.CreatePartitions("my_topic", 3, nil, false)
  242. if err != ErrUnsupportedVersion {
  243. t.Fatal(err)
  244. }
  245. err = admin.Close()
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. }
  250. func TestClusterAdminDeleteRecords(t *testing.T) {
  251. seedBroker := NewMockBroker(t, 1)
  252. defer seedBroker.Close()
  253. seedBroker.SetHandlerByMap(map[string]MockResponse{
  254. "MetadataRequest": NewMockMetadataResponse(t).
  255. SetController(seedBroker.BrokerID()).
  256. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  257. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  258. })
  259. config := NewConfig()
  260. config.Version = V1_0_0_0
  261. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  262. if err != nil {
  263. t.Fatal(err)
  264. }
  265. partitionOffset := make(map[int32]int64)
  266. partitionOffset[1] = 1000
  267. partitionOffset[2] = 1000
  268. partitionOffset[3] = 1000
  269. err = admin.DeleteRecords("my_topic", partitionOffset)
  270. if err != nil {
  271. t.Fatal(err)
  272. }
  273. err = admin.Close()
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. }
  278. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  279. seedBroker := NewMockBroker(t, 1)
  280. defer seedBroker.Close()
  281. seedBroker.SetHandlerByMap(map[string]MockResponse{
  282. "MetadataRequest": NewMockMetadataResponse(t).
  283. SetController(seedBroker.BrokerID()).
  284. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  285. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  286. })
  287. config := NewConfig()
  288. config.Version = V0_10_2_0
  289. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. partitionOffset := make(map[int32]int64)
  294. partitionOffset[1] = 1000
  295. partitionOffset[2] = 1000
  296. partitionOffset[3] = 1000
  297. err = admin.DeleteRecords("my_topic", partitionOffset)
  298. if err != ErrUnsupportedVersion {
  299. t.Fatal(err)
  300. }
  301. err = admin.Close()
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. }
  306. func TestClusterAdminDescribeConfig(t *testing.T) {
  307. seedBroker := NewMockBroker(t, 1)
  308. defer seedBroker.Close()
  309. seedBroker.SetHandlerByMap(map[string]MockResponse{
  310. "MetadataRequest": NewMockMetadataResponse(t).
  311. SetController(seedBroker.BrokerID()).
  312. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  313. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  314. })
  315. config := NewConfig()
  316. config.Version = V1_0_0_0
  317. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  322. entries, err := admin.DescribeConfig(resource)
  323. if err != nil {
  324. t.Fatal(err)
  325. }
  326. if len(entries) <= 0 {
  327. t.Fatal(errors.New("no resource present"))
  328. }
  329. err = admin.Close()
  330. if err != nil {
  331. t.Fatal(err)
  332. }
  333. }
  334. func TestClusterAdminAlterConfig(t *testing.T) {
  335. seedBroker := NewMockBroker(t, 1)
  336. defer seedBroker.Close()
  337. seedBroker.SetHandlerByMap(map[string]MockResponse{
  338. "MetadataRequest": NewMockMetadataResponse(t).
  339. SetController(seedBroker.BrokerID()).
  340. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  341. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  342. })
  343. config := NewConfig()
  344. config.Version = V1_0_0_0
  345. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  346. if err != nil {
  347. t.Fatal(err)
  348. }
  349. var value string
  350. entries := make(map[string]*string)
  351. value = "3"
  352. entries["ReplicationFactor"] = &value
  353. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  354. if err != nil {
  355. t.Fatal(err)
  356. }
  357. err = admin.Close()
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. }
  362. func TestClusterAdminCreateAcl(t *testing.T) {
  363. seedBroker := NewMockBroker(t, 1)
  364. defer seedBroker.Close()
  365. seedBroker.SetHandlerByMap(map[string]MockResponse{
  366. "MetadataRequest": NewMockMetadataResponse(t).
  367. SetController(seedBroker.BrokerID()).
  368. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  369. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  370. })
  371. config := NewConfig()
  372. config.Version = V1_0_0_0
  373. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  374. if err != nil {
  375. t.Fatal(err)
  376. }
  377. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  378. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  379. err = admin.CreateACL(r, a)
  380. if err != nil {
  381. t.Fatal(err)
  382. }
  383. err = admin.Close()
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. }
  388. func TestClusterAdminListAcls(t *testing.T) {
  389. seedBroker := NewMockBroker(t, 1)
  390. defer seedBroker.Close()
  391. seedBroker.SetHandlerByMap(map[string]MockResponse{
  392. "MetadataRequest": NewMockMetadataResponse(t).
  393. SetController(seedBroker.BrokerID()).
  394. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  395. "DescribeAclsRequest": NewMockListAclsResponse(t),
  396. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  397. })
  398. config := NewConfig()
  399. config.Version = V1_0_0_0
  400. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  405. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  406. err = admin.CreateACL(r, a)
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. resourceName := "my_topic"
  411. filter := AclFilter{
  412. ResourceType: AclResourceTopic,
  413. Operation: AclOperationRead,
  414. ResourceName: &resourceName,
  415. }
  416. rAcls, err := admin.ListAcls(filter)
  417. if err != nil {
  418. t.Fatal(err)
  419. }
  420. if len(rAcls) <= 0 {
  421. t.Fatal("no acls present")
  422. }
  423. err = admin.Close()
  424. if err != nil {
  425. t.Fatal(err)
  426. }
  427. }
  428. func TestClusterAdminDeleteAcl(t *testing.T) {
  429. seedBroker := NewMockBroker(t, 1)
  430. defer seedBroker.Close()
  431. seedBroker.SetHandlerByMap(map[string]MockResponse{
  432. "MetadataRequest": NewMockMetadataResponse(t).
  433. SetController(seedBroker.BrokerID()).
  434. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  435. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  436. })
  437. config := NewConfig()
  438. config.Version = V1_0_0_0
  439. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  440. if err != nil {
  441. t.Fatal(err)
  442. }
  443. resourceName := "my_topic"
  444. filter := AclFilter{
  445. ResourceType: AclResourceTopic,
  446. Operation: AclOperationAlter,
  447. ResourceName: &resourceName,
  448. }
  449. _, err = admin.DeleteACL(filter, false)
  450. if err != nil {
  451. t.Fatal(err)
  452. }
  453. err = admin.Close()
  454. if err != nil {
  455. t.Fatal(err)
  456. }
  457. }
  458. func TestDescribeTopic(t *testing.T) {
  459. seedBroker := NewMockBroker(t, 1)
  460. defer seedBroker.Close()
  461. seedBroker.SetHandlerByMap(map[string]MockResponse{
  462. "MetadataRequest": NewMockMetadataResponse(t).
  463. SetController(seedBroker.BrokerID()).
  464. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  465. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  466. })
  467. config := NewConfig()
  468. config.Version = V1_0_0_0
  469. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. topics, err := admin.DescribeTopics([]string{"my_topic"})
  474. if err != nil {
  475. t.Fatal(err)
  476. }
  477. if len(topics) != 1 {
  478. t.Fatalf("Expected 1 result, got %v", len(topics))
  479. }
  480. if topics[0].Name != "my_topic" {
  481. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  482. }
  483. err = admin.Close()
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. }
  488. func TestDescribeConsumerGroup(t *testing.T) {
  489. seedBroker := NewMockBroker(t, 1)
  490. defer seedBroker.Close()
  491. expectedGroupID := "my-group"
  492. seedBroker.SetHandlerByMap(map[string]MockResponse{
  493. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  494. GroupId: expectedGroupID,
  495. }),
  496. "MetadataRequest": NewMockMetadataResponse(t).
  497. SetController(seedBroker.BrokerID()).
  498. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  499. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  500. })
  501. config := NewConfig()
  502. config.Version = V1_0_0_0
  503. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  504. if err != nil {
  505. t.Fatal(err)
  506. }
  507. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  508. if err != nil {
  509. t.Fatal(err)
  510. }
  511. if len(result) != 1 {
  512. t.Fatalf("Expected 1 result, got %v", len(result))
  513. }
  514. if result[0].GroupId != expectedGroupID {
  515. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  516. }
  517. err = admin.Close()
  518. if err != nil {
  519. t.Fatal(err)
  520. }
  521. }
  522. func TestListConsumerGroups(t *testing.T) {
  523. seedBroker := NewMockBroker(t, 1)
  524. defer seedBroker.Close()
  525. seedBroker.SetHandlerByMap(map[string]MockResponse{
  526. "MetadataRequest": NewMockMetadataResponse(t).
  527. SetController(seedBroker.BrokerID()).
  528. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  529. "ListGroupsRequest": NewMockListGroupsResponse(t).
  530. AddGroup("my-group", "consumer"),
  531. })
  532. config := NewConfig()
  533. config.Version = V1_0_0_0
  534. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  535. if err != nil {
  536. t.Fatal(err)
  537. }
  538. groups, err := admin.ListConsumerGroups()
  539. if err != nil {
  540. t.Fatal(err)
  541. }
  542. if len(groups) != 1 {
  543. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  544. }
  545. protocolType, ok := groups["my-group"]
  546. if !ok {
  547. t.Fatal("Expected group to be returned, but it did not")
  548. }
  549. if protocolType != "consumer" {
  550. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  551. }
  552. err = admin.Close()
  553. if err != nil {
  554. t.Fatal(err)
  555. }
  556. }
  557. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  558. seedBroker := NewMockBroker(t, 1)
  559. defer seedBroker.Close()
  560. secondBroker := NewMockBroker(t, 2)
  561. defer secondBroker.Close()
  562. firstGroup := "first"
  563. secondGroup := "second"
  564. nonExistingGroup := "non-existing-group"
  565. seedBroker.SetHandlerByMap(map[string]MockResponse{
  566. "MetadataRequest": NewMockMetadataResponse(t).
  567. SetController(seedBroker.BrokerID()).
  568. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  569. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  570. "ListGroupsRequest": NewMockListGroupsResponse(t).
  571. AddGroup(firstGroup, "consumer"),
  572. })
  573. secondBroker.SetHandlerByMap(map[string]MockResponse{
  574. "MetadataRequest": NewMockMetadataResponse(t).
  575. SetController(seedBroker.BrokerID()).
  576. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  577. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  578. "ListGroupsRequest": NewMockListGroupsResponse(t).
  579. AddGroup(secondGroup, "consumer"),
  580. })
  581. config := NewConfig()
  582. config.Version = V1_0_0_0
  583. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  584. if err != nil {
  585. t.Fatal(err)
  586. }
  587. groups, err := admin.ListConsumerGroups()
  588. if err != nil {
  589. t.Fatal(err)
  590. }
  591. if len(groups) != 2 {
  592. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  593. }
  594. if _, found := groups[firstGroup]; !found {
  595. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  596. }
  597. if _, found := groups[secondGroup]; !found {
  598. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  599. }
  600. if _, found := groups[nonExistingGroup]; found {
  601. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  602. }
  603. err = admin.Close()
  604. if err != nil {
  605. t.Fatal(err)
  606. }
  607. }
  608. func TestListConsumerGroupOffsets(t *testing.T) {
  609. seedBroker := NewMockBroker(t, 1)
  610. defer seedBroker.Close()
  611. group := "my-group"
  612. topic := "my-topic"
  613. partition := int32(0)
  614. expectedOffset := int64(0)
  615. seedBroker.SetHandlerByMap(map[string]MockResponse{
  616. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  617. "MetadataRequest": NewMockMetadataResponse(t).
  618. SetController(seedBroker.BrokerID()).
  619. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  620. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  621. })
  622. config := NewConfig()
  623. config.Version = V1_0_0_0
  624. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  625. if err != nil {
  626. t.Fatal(err)
  627. }
  628. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  629. topic: []int32{0},
  630. })
  631. if err != nil {
  632. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  633. }
  634. block := response.GetBlock(topic, partition)
  635. if block == nil {
  636. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  637. }
  638. if block.Offset != expectedOffset {
  639. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  640. }
  641. err = admin.Close()
  642. if err != nil {
  643. t.Fatal(err)
  644. }
  645. }