admin_test.go 33 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. )
  7. func TestClusterAdmin(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. defer seedBroker.Close()
  10. seedBroker.SetHandlerByMap(map[string]MockResponse{
  11. "MetadataRequest": NewMockMetadataResponse(t).
  12. SetController(seedBroker.BrokerID()).
  13. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  14. })
  15. config := NewConfig()
  16. config.Version = V1_0_0_0
  17. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. err = admin.Close()
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. }
  26. func TestClusterAdminInvalidController(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. defer seedBroker.Close()
  29. seedBroker.SetHandlerByMap(map[string]MockResponse{
  30. "MetadataRequest": NewMockMetadataResponse(t).
  31. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  32. })
  33. config := NewConfig()
  34. config.Version = V1_0_0_0
  35. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  36. if err == nil {
  37. t.Fatal(errors.New("controller not set still cluster admin was created"))
  38. }
  39. if err != ErrControllerNotAvailable {
  40. t.Fatal(err)
  41. }
  42. }
  43. func TestClusterAdminCreateTopic(t *testing.T) {
  44. seedBroker := NewMockBroker(t, 1)
  45. defer seedBroker.Close()
  46. seedBroker.SetHandlerByMap(map[string]MockResponse{
  47. "MetadataRequest": NewMockMetadataResponse(t).
  48. SetController(seedBroker.BrokerID()).
  49. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  50. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  51. })
  52. config := NewConfig()
  53. config.Version = V0_10_2_0
  54. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.Close()
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  68. seedBroker := NewMockBroker(t, 1)
  69. defer seedBroker.Close()
  70. seedBroker.SetHandlerByMap(map[string]MockResponse{
  71. "MetadataRequest": NewMockMetadataResponse(t).
  72. SetController(seedBroker.BrokerID()).
  73. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  74. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  75. })
  76. config := NewConfig()
  77. config.Version = V0_10_2_0
  78. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. err = admin.CreateTopic("my_topic", nil, false)
  83. if err.Error() != "you must specify topic details" {
  84. t.Fatal(err)
  85. }
  86. err = admin.Close()
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. }
  91. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  92. seedBroker := NewMockBroker(t, 1)
  93. defer seedBroker.Close()
  94. seedBroker.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetController(seedBroker.BrokerID()).
  97. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  98. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  99. })
  100. config := NewConfig()
  101. config.Version = V0_11_0_0
  102. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  107. want := "insufficient permissions to create topic with reserved prefix"
  108. if !strings.HasSuffix(err.Error(), want) {
  109. t.Fatal(err)
  110. }
  111. err = admin.Close()
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. func TestClusterAdminListTopics(t *testing.T) {
  117. seedBroker := NewMockBroker(t, 1)
  118. defer seedBroker.Close()
  119. seedBroker.SetHandlerByMap(map[string]MockResponse{
  120. "MetadataRequest": NewMockMetadataResponse(t).
  121. SetController(seedBroker.BrokerID()).
  122. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  123. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  124. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  125. })
  126. config := NewConfig()
  127. config.Version = V1_1_0_0
  128. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := admin.ListTopics()
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) <= 0 {
  137. t.Fatal(errors.New("no resource present"))
  138. }
  139. topic, found := entries["my_topic"]
  140. if !found {
  141. t.Fatal(errors.New("topic not found in response"))
  142. }
  143. _, found = topic.ConfigEntries["max.message.bytes"]
  144. if found {
  145. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  146. }
  147. value := topic.ConfigEntries["retention.ms"]
  148. if value == nil || *value != "5000" {
  149. t.Fatal(errors.New("non-default topic config entry not found in response"))
  150. }
  151. err = admin.Close()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  156. t.Fatal(errors.New("replica assignment not found in response"))
  157. }
  158. }
  159. func TestClusterAdminDeleteTopic(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. defer seedBroker.Close()
  162. seedBroker.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetController(seedBroker.BrokerID()).
  165. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  166. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  167. })
  168. config := NewConfig()
  169. config.Version = V0_10_2_0
  170. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. err = admin.DeleteTopic("my_topic")
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.Close()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. }
  183. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  184. seedBroker := NewMockBroker(t, 1)
  185. defer seedBroker.Close()
  186. seedBroker.SetHandlerByMap(map[string]MockResponse{
  187. "MetadataRequest": NewMockMetadataResponse(t).
  188. SetController(seedBroker.BrokerID()).
  189. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  190. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  191. })
  192. config := NewConfig()
  193. config.Version = V0_10_2_0
  194. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. err = admin.DeleteTopic("")
  199. if err != ErrInvalidTopic {
  200. t.Fatal(err)
  201. }
  202. err = admin.Close()
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. }
  207. func TestClusterAdminCreatePartitions(t *testing.T) {
  208. seedBroker := NewMockBroker(t, 1)
  209. defer seedBroker.Close()
  210. seedBroker.SetHandlerByMap(map[string]MockResponse{
  211. "MetadataRequest": NewMockMetadataResponse(t).
  212. SetController(seedBroker.BrokerID()).
  213. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  214. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  215. })
  216. config := NewConfig()
  217. config.Version = V1_0_0_0
  218. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. err = admin.CreatePartitions("my_topic", 3, nil, false)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.Close()
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. }
  231. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  232. seedBroker := NewMockBroker(t, 1)
  233. defer seedBroker.Close()
  234. seedBroker.SetHandlerByMap(map[string]MockResponse{
  235. "MetadataRequest": NewMockMetadataResponse(t).
  236. SetController(seedBroker.BrokerID()).
  237. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  238. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  239. })
  240. config := NewConfig()
  241. config.Version = V0_10_2_0
  242. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. err = admin.CreatePartitions("my_topic", 3, nil, false)
  247. if err != ErrUnsupportedVersion {
  248. t.Fatal(err)
  249. }
  250. err = admin.Close()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  256. seedBroker := NewMockBroker(t, 1)
  257. defer seedBroker.Close()
  258. seedBroker.SetHandlerByMap(map[string]MockResponse{
  259. "MetadataRequest": NewMockMetadataResponse(t).
  260. SetController(seedBroker.BrokerID()).
  261. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  262. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  263. })
  264. config := NewConfig()
  265. config.Version = V1_0_0_0
  266. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  271. want := "insufficient permissions to create partition on topic with reserved prefix"
  272. if !strings.HasSuffix(err.Error(), want) {
  273. t.Fatal(err)
  274. }
  275. err = admin.Close()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. }
  280. func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
  281. seedBroker := NewMockBroker(t, 1)
  282. defer seedBroker.Close()
  283. secondBroker := NewMockBroker(t, 2)
  284. defer secondBroker.Close()
  285. seedBroker.SetHandlerByMap(map[string]MockResponse{
  286. "MetadataRequest": NewMockMetadataResponse(t).
  287. SetController(secondBroker.BrokerID()).
  288. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  289. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  290. })
  291. secondBroker.SetHandlerByMap(map[string]MockResponse{
  292. "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
  293. })
  294. config := NewConfig()
  295. config.Version = V2_4_0_0
  296. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  297. if err != nil {
  298. t.Fatal(err)
  299. }
  300. var topicAssignment = make([][]int32, 0, 3)
  301. err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. err = admin.Close()
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. }
  310. func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
  311. seedBroker := NewMockBroker(t, 1)
  312. defer seedBroker.Close()
  313. secondBroker := NewMockBroker(t, 2)
  314. defer secondBroker.Close()
  315. seedBroker.SetHandlerByMap(map[string]MockResponse{
  316. "MetadataRequest": NewMockMetadataResponse(t).
  317. SetController(secondBroker.BrokerID()).
  318. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  319. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  320. })
  321. secondBroker.SetHandlerByMap(map[string]MockResponse{
  322. "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
  323. })
  324. config := NewConfig()
  325. config.Version = V2_3_0_0
  326. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. var topicAssignment = make([][]int32, 0, 3)
  331. err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
  332. if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
  333. t.Fatal(err)
  334. }
  335. err = admin.Close()
  336. if err != nil {
  337. t.Fatal(err)
  338. }
  339. }
  340. func TestClusterAdminListPartitionReassignments(t *testing.T) {
  341. seedBroker := NewMockBroker(t, 1)
  342. defer seedBroker.Close()
  343. secondBroker := NewMockBroker(t, 2)
  344. defer secondBroker.Close()
  345. seedBroker.SetHandlerByMap(map[string]MockResponse{
  346. "MetadataRequest": NewMockMetadataResponse(t).
  347. SetController(secondBroker.BrokerID()).
  348. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  349. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  350. })
  351. secondBroker.SetHandlerByMap(map[string]MockResponse{
  352. "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
  353. })
  354. config := NewConfig()
  355. config.Version = V2_4_0_0
  356. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. partitionStatus, ok := response["my_topic"]
  365. if !ok {
  366. t.Fatalf("topic missing in response")
  367. } else {
  368. if len(partitionStatus) != 2 {
  369. t.Fatalf("partition missing in response")
  370. }
  371. }
  372. err = admin.Close()
  373. if err != nil {
  374. t.Fatal(err)
  375. }
  376. }
  377. func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
  378. seedBroker := NewMockBroker(t, 1)
  379. defer seedBroker.Close()
  380. secondBroker := NewMockBroker(t, 2)
  381. defer secondBroker.Close()
  382. seedBroker.SetHandlerByMap(map[string]MockResponse{
  383. "MetadataRequest": NewMockMetadataResponse(t).
  384. SetController(secondBroker.BrokerID()).
  385. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  386. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  387. })
  388. secondBroker.SetHandlerByMap(map[string]MockResponse{
  389. "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
  390. })
  391. config := NewConfig()
  392. config.Version = V2_3_0_0
  393. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  394. if err != nil {
  395. t.Fatal(err)
  396. }
  397. var partitions = make([]int32, 0)
  398. _, err = admin.ListPartitionReassignments("my_topic", partitions)
  399. if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
  400. t.Fatal(err)
  401. }
  402. err = admin.Close()
  403. if err != nil {
  404. t.Fatal(err)
  405. }
  406. }
  407. func TestClusterAdminDeleteRecords(t *testing.T) {
  408. topicName := "my_topic"
  409. seedBroker := NewMockBroker(t, 1)
  410. defer seedBroker.Close()
  411. seedBroker.SetHandlerByMap(map[string]MockResponse{
  412. "MetadataRequest": NewMockMetadataResponse(t).
  413. SetController(seedBroker.BrokerID()).
  414. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  415. SetLeader(topicName, 1, 1).
  416. SetLeader(topicName, 2, 1).
  417. SetLeader(topicName, 3, 1),
  418. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  419. })
  420. config := NewConfig()
  421. config.Version = V1_0_0_0
  422. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  423. if err != nil {
  424. t.Fatal(err)
  425. }
  426. partitionOffsetFake := make(map[int32]int64)
  427. partitionOffsetFake[4] = 1000
  428. errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
  429. if errFake == nil {
  430. t.Fatal(err)
  431. }
  432. partitionOffset := make(map[int32]int64)
  433. partitionOffset[1] = 1000
  434. partitionOffset[2] = 1000
  435. partitionOffset[3] = 1000
  436. err = admin.DeleteRecords(topicName, partitionOffset)
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. err = admin.Close()
  441. if err != nil {
  442. t.Fatal(err)
  443. }
  444. }
  445. func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
  446. topicName := "my_topic"
  447. seedBroker := NewMockBroker(t, 1)
  448. secondBroker := NewMockBroker(t, 2)
  449. defer seedBroker.Close()
  450. defer secondBroker.Close()
  451. seedBroker.SetHandlerByMap(map[string]MockResponse{
  452. "MetadataRequest": NewMockMetadataResponse(t).
  453. SetController(seedBroker.BrokerID()).
  454. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  455. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  456. SetLeader(topicName, 1, 1).
  457. SetLeader(topicName, 2, 1).
  458. SetLeader(topicName, 3, 2),
  459. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  460. })
  461. secondBroker.SetHandlerByMap(map[string]MockResponse{
  462. "MetadataRequest": NewMockMetadataResponse(t).
  463. SetController(seedBroker.BrokerID()).
  464. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  465. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  466. SetLeader(topicName, 1, 1).
  467. SetLeader(topicName, 2, 1).
  468. SetLeader(topicName, 3, 2),
  469. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  470. })
  471. config := NewConfig()
  472. config.Version = V1_0_0_0
  473. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  474. if err != nil {
  475. t.Fatal(err)
  476. }
  477. partitionOffset := make(map[int32]int64)
  478. partitionOffset[1] = 1000
  479. partitionOffset[2] = 1000
  480. partitionOffset[3] = 1000
  481. err = admin.DeleteRecords(topicName, partitionOffset)
  482. if err != nil {
  483. t.Fatal(err)
  484. }
  485. err = admin.Close()
  486. if err != nil {
  487. t.Fatal(err)
  488. }
  489. }
  490. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  491. topicName := "my_topic"
  492. seedBroker := NewMockBroker(t, 1)
  493. defer seedBroker.Close()
  494. seedBroker.SetHandlerByMap(map[string]MockResponse{
  495. "MetadataRequest": NewMockMetadataResponse(t).
  496. SetController(seedBroker.BrokerID()).
  497. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  498. SetLeader(topicName, 1, 1).
  499. SetLeader(topicName, 2, 1).
  500. SetLeader(topicName, 3, 1),
  501. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  502. })
  503. config := NewConfig()
  504. config.Version = V0_10_2_0
  505. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  506. if err != nil {
  507. t.Fatal(err)
  508. }
  509. partitionOffset := make(map[int32]int64)
  510. partitionOffset[1] = 1000
  511. partitionOffset[2] = 1000
  512. partitionOffset[3] = 1000
  513. err = admin.DeleteRecords(topicName, partitionOffset)
  514. if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
  515. t.Fatal(err)
  516. }
  517. deleteRecordsError, ok := err.(ErrDeleteRecords)
  518. if !ok {
  519. t.Fatal(err)
  520. }
  521. for _, err := range *deleteRecordsError.Errors {
  522. if err != ErrUnsupportedVersion {
  523. t.Fatal(err)
  524. }
  525. }
  526. err = admin.Close()
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. }
  531. func TestClusterAdminDescribeConfig(t *testing.T) {
  532. seedBroker := NewMockBroker(t, 1)
  533. defer seedBroker.Close()
  534. seedBroker.SetHandlerByMap(map[string]MockResponse{
  535. "MetadataRequest": NewMockMetadataResponse(t).
  536. SetController(seedBroker.BrokerID()).
  537. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  538. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  539. })
  540. var tests = []struct {
  541. saramaVersion KafkaVersion
  542. requestVersion int16
  543. includeSynonyms bool
  544. }{
  545. {V1_0_0_0, 0, false},
  546. {V1_1_0_0, 1, true},
  547. {V1_1_1_0, 1, true},
  548. {V2_0_0_0, 2, true},
  549. }
  550. for _, tt := range tests {
  551. config := NewConfig()
  552. config.Version = tt.saramaVersion
  553. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  554. if err != nil {
  555. t.Fatal(err)
  556. }
  557. defer func() {
  558. _ = admin.Close()
  559. }()
  560. resource := ConfigResource{
  561. Name: "r1",
  562. Type: TopicResource,
  563. ConfigNames: []string{"my_topic"},
  564. }
  565. entries, err := admin.DescribeConfig(resource)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. history := seedBroker.History()
  570. describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
  571. if !ok {
  572. t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
  573. }
  574. if describeReq.Version != tt.requestVersion {
  575. t.Fatalf(
  576. "requestVersion %v did not match expected %v",
  577. describeReq.Version, tt.requestVersion)
  578. }
  579. if len(entries) <= 0 {
  580. t.Fatal(errors.New("no resource present"))
  581. }
  582. if tt.includeSynonyms {
  583. if len(entries[0].Synonyms) == 0 {
  584. t.Fatal("expected synonyms to have been included")
  585. }
  586. }
  587. }
  588. }
  589. // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
  590. // is sent to the broker in the resource struct, _not_ the controller
  591. func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
  592. controllerBroker := NewMockBroker(t, 1)
  593. defer controllerBroker.Close()
  594. configBroker := NewMockBroker(t, 2)
  595. defer configBroker.Close()
  596. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  597. "MetadataRequest": NewMockMetadataResponse(t).
  598. SetController(controllerBroker.BrokerID()).
  599. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  600. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  601. })
  602. configBroker.SetHandlerByMap(map[string]MockResponse{
  603. "MetadataRequest": NewMockMetadataResponse(t).
  604. SetController(controllerBroker.BrokerID()).
  605. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  606. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  607. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  608. })
  609. config := NewConfig()
  610. config.Version = V1_0_0_0
  611. admin, err := NewClusterAdmin(
  612. []string{
  613. controllerBroker.Addr(),
  614. configBroker.Addr(),
  615. }, config)
  616. if err != nil {
  617. t.Fatal(err)
  618. }
  619. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  620. resource := ConfigResource{Name: "2", Type: resourceType}
  621. entries, err := admin.DescribeConfig(resource)
  622. if err != nil {
  623. t.Fatal(err)
  624. }
  625. if len(entries) <= 0 {
  626. t.Fatal(errors.New("no resource present"))
  627. }
  628. }
  629. err = admin.Close()
  630. if err != nil {
  631. t.Fatal(err)
  632. }
  633. }
  634. func TestClusterAdminAlterConfig(t *testing.T) {
  635. seedBroker := NewMockBroker(t, 1)
  636. defer seedBroker.Close()
  637. seedBroker.SetHandlerByMap(map[string]MockResponse{
  638. "MetadataRequest": NewMockMetadataResponse(t).
  639. SetController(seedBroker.BrokerID()).
  640. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  641. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  642. })
  643. config := NewConfig()
  644. config.Version = V1_0_0_0
  645. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. var value string
  650. entries := make(map[string]*string)
  651. value = "60000"
  652. entries["retention.ms"] = &value
  653. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  654. if err != nil {
  655. t.Fatal(err)
  656. }
  657. err = admin.Close()
  658. if err != nil {
  659. t.Fatal(err)
  660. }
  661. }
  662. func TestClusterAdminAlterBrokerConfig(t *testing.T) {
  663. controllerBroker := NewMockBroker(t, 1)
  664. defer controllerBroker.Close()
  665. configBroker := NewMockBroker(t, 2)
  666. defer configBroker.Close()
  667. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  668. "MetadataRequest": NewMockMetadataResponse(t).
  669. SetController(controllerBroker.BrokerID()).
  670. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  671. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  672. })
  673. configBroker.SetHandlerByMap(map[string]MockResponse{
  674. "MetadataRequest": NewMockMetadataResponse(t).
  675. SetController(controllerBroker.BrokerID()).
  676. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  677. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  678. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  679. })
  680. config := NewConfig()
  681. config.Version = V1_0_0_0
  682. admin, err := NewClusterAdmin(
  683. []string{
  684. controllerBroker.Addr(),
  685. configBroker.Addr(),
  686. }, config)
  687. if err != nil {
  688. t.Fatal(err)
  689. }
  690. var value string
  691. entries := make(map[string]*string)
  692. value = "3"
  693. entries["min.insync.replicas"] = &value
  694. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  695. resource := ConfigResource{Name: "2", Type: resourceType}
  696. err = admin.AlterConfig(
  697. resource.Type,
  698. resource.Name,
  699. entries,
  700. false)
  701. if err != nil {
  702. t.Fatal(err)
  703. }
  704. }
  705. err = admin.Close()
  706. if err != nil {
  707. t.Fatal(err)
  708. }
  709. }
  710. func TestClusterAdminCreateAcl(t *testing.T) {
  711. seedBroker := NewMockBroker(t, 1)
  712. defer seedBroker.Close()
  713. seedBroker.SetHandlerByMap(map[string]MockResponse{
  714. "MetadataRequest": NewMockMetadataResponse(t).
  715. SetController(seedBroker.BrokerID()).
  716. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  717. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  718. })
  719. config := NewConfig()
  720. config.Version = V1_0_0_0
  721. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  726. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  727. err = admin.CreateACL(r, a)
  728. if err != nil {
  729. t.Fatal(err)
  730. }
  731. err = admin.Close()
  732. if err != nil {
  733. t.Fatal(err)
  734. }
  735. }
  736. func TestClusterAdminListAcls(t *testing.T) {
  737. seedBroker := NewMockBroker(t, 1)
  738. defer seedBroker.Close()
  739. seedBroker.SetHandlerByMap(map[string]MockResponse{
  740. "MetadataRequest": NewMockMetadataResponse(t).
  741. SetController(seedBroker.BrokerID()).
  742. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  743. "DescribeAclsRequest": NewMockListAclsResponse(t),
  744. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  745. })
  746. config := NewConfig()
  747. config.Version = V1_0_0_0
  748. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  749. if err != nil {
  750. t.Fatal(err)
  751. }
  752. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  753. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  754. err = admin.CreateACL(r, a)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. resourceName := "my_topic"
  759. filter := AclFilter{
  760. ResourceType: AclResourceTopic,
  761. Operation: AclOperationRead,
  762. ResourceName: &resourceName,
  763. }
  764. rAcls, err := admin.ListAcls(filter)
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. if len(rAcls) <= 0 {
  769. t.Fatal("no acls present")
  770. }
  771. err = admin.Close()
  772. if err != nil {
  773. t.Fatal(err)
  774. }
  775. }
  776. func TestClusterAdminDeleteAcl(t *testing.T) {
  777. seedBroker := NewMockBroker(t, 1)
  778. defer seedBroker.Close()
  779. seedBroker.SetHandlerByMap(map[string]MockResponse{
  780. "MetadataRequest": NewMockMetadataResponse(t).
  781. SetController(seedBroker.BrokerID()).
  782. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  783. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  784. })
  785. config := NewConfig()
  786. config.Version = V1_0_0_0
  787. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  788. if err != nil {
  789. t.Fatal(err)
  790. }
  791. resourceName := "my_topic"
  792. filter := AclFilter{
  793. ResourceType: AclResourceTopic,
  794. Operation: AclOperationAlter,
  795. ResourceName: &resourceName,
  796. }
  797. _, err = admin.DeleteACL(filter, false)
  798. if err != nil {
  799. t.Fatal(err)
  800. }
  801. err = admin.Close()
  802. if err != nil {
  803. t.Fatal(err)
  804. }
  805. }
  806. func TestDescribeTopic(t *testing.T) {
  807. seedBroker := NewMockBroker(t, 1)
  808. defer seedBroker.Close()
  809. seedBroker.SetHandlerByMap(map[string]MockResponse{
  810. "MetadataRequest": NewMockMetadataResponse(t).
  811. SetController(seedBroker.BrokerID()).
  812. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  813. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  814. })
  815. config := NewConfig()
  816. config.Version = V1_0_0_0
  817. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  818. if err != nil {
  819. t.Fatal(err)
  820. }
  821. topics, err := admin.DescribeTopics([]string{"my_topic"})
  822. if err != nil {
  823. t.Fatal(err)
  824. }
  825. if len(topics) != 1 {
  826. t.Fatalf("Expected 1 result, got %v", len(topics))
  827. }
  828. if topics[0].Name != "my_topic" {
  829. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  830. }
  831. err = admin.Close()
  832. if err != nil {
  833. t.Fatal(err)
  834. }
  835. }
  836. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  837. seedBroker := NewMockBroker(t, 1)
  838. defer seedBroker.Close()
  839. seedBroker.SetHandlerByMap(map[string]MockResponse{
  840. "MetadataRequest": NewMockMetadataResponse(t).
  841. SetController(seedBroker.BrokerID()).
  842. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  843. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  844. })
  845. config := NewConfig()
  846. config.Version = V0_11_0_0
  847. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  848. if err != nil {
  849. t.Fatal(err)
  850. }
  851. topics, err := admin.DescribeTopics([]string{"my_topic"})
  852. if err != nil {
  853. t.Fatal(err)
  854. }
  855. if len(topics) != 1 {
  856. t.Fatalf("Expected 1 result, got %v", len(topics))
  857. }
  858. if topics[0].Name != "my_topic" {
  859. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  860. }
  861. err = admin.Close()
  862. if err != nil {
  863. t.Fatal(err)
  864. }
  865. }
  866. func TestDescribeConsumerGroup(t *testing.T) {
  867. seedBroker := NewMockBroker(t, 1)
  868. defer seedBroker.Close()
  869. expectedGroupID := "my-group"
  870. seedBroker.SetHandlerByMap(map[string]MockResponse{
  871. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  872. GroupId: expectedGroupID,
  873. }),
  874. "MetadataRequest": NewMockMetadataResponse(t).
  875. SetController(seedBroker.BrokerID()).
  876. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  877. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  878. })
  879. config := NewConfig()
  880. config.Version = V1_0_0_0
  881. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  882. if err != nil {
  883. t.Fatal(err)
  884. }
  885. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  886. if err != nil {
  887. t.Fatal(err)
  888. }
  889. if len(result) != 1 {
  890. t.Fatalf("Expected 1 result, got %v", len(result))
  891. }
  892. if result[0].GroupId != expectedGroupID {
  893. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  894. }
  895. err = admin.Close()
  896. if err != nil {
  897. t.Fatal(err)
  898. }
  899. }
  900. func TestListConsumerGroups(t *testing.T) {
  901. seedBroker := NewMockBroker(t, 1)
  902. defer seedBroker.Close()
  903. seedBroker.SetHandlerByMap(map[string]MockResponse{
  904. "MetadataRequest": NewMockMetadataResponse(t).
  905. SetController(seedBroker.BrokerID()).
  906. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  907. "ListGroupsRequest": NewMockListGroupsResponse(t).
  908. AddGroup("my-group", "consumer"),
  909. })
  910. config := NewConfig()
  911. config.Version = V1_0_0_0
  912. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  913. if err != nil {
  914. t.Fatal(err)
  915. }
  916. groups, err := admin.ListConsumerGroups()
  917. if err != nil {
  918. t.Fatal(err)
  919. }
  920. if len(groups) != 1 {
  921. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  922. }
  923. protocolType, ok := groups["my-group"]
  924. if !ok {
  925. t.Fatal("Expected group to be returned, but it did not")
  926. }
  927. if protocolType != "consumer" {
  928. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  929. }
  930. err = admin.Close()
  931. if err != nil {
  932. t.Fatal(err)
  933. }
  934. }
  935. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  936. seedBroker := NewMockBroker(t, 1)
  937. defer seedBroker.Close()
  938. secondBroker := NewMockBroker(t, 2)
  939. defer secondBroker.Close()
  940. firstGroup := "first"
  941. secondGroup := "second"
  942. nonExistingGroup := "non-existing-group"
  943. seedBroker.SetHandlerByMap(map[string]MockResponse{
  944. "MetadataRequest": NewMockMetadataResponse(t).
  945. SetController(seedBroker.BrokerID()).
  946. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  947. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  948. "ListGroupsRequest": NewMockListGroupsResponse(t).
  949. AddGroup(firstGroup, "consumer"),
  950. })
  951. secondBroker.SetHandlerByMap(map[string]MockResponse{
  952. "MetadataRequest": NewMockMetadataResponse(t).
  953. SetController(seedBroker.BrokerID()).
  954. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  955. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  956. "ListGroupsRequest": NewMockListGroupsResponse(t).
  957. AddGroup(secondGroup, "consumer"),
  958. })
  959. config := NewConfig()
  960. config.Version = V1_0_0_0
  961. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  962. if err != nil {
  963. t.Fatal(err)
  964. }
  965. groups, err := admin.ListConsumerGroups()
  966. if err != nil {
  967. t.Fatal(err)
  968. }
  969. if len(groups) != 2 {
  970. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  971. }
  972. if _, found := groups[firstGroup]; !found {
  973. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  974. }
  975. if _, found := groups[secondGroup]; !found {
  976. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  977. }
  978. if _, found := groups[nonExistingGroup]; found {
  979. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  980. }
  981. err = admin.Close()
  982. if err != nil {
  983. t.Fatal(err)
  984. }
  985. }
  986. func TestListConsumerGroupOffsets(t *testing.T) {
  987. seedBroker := NewMockBroker(t, 1)
  988. defer seedBroker.Close()
  989. group := "my-group"
  990. topic := "my-topic"
  991. partition := int32(0)
  992. expectedOffset := int64(0)
  993. seedBroker.SetHandlerByMap(map[string]MockResponse{
  994. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
  995. "MetadataRequest": NewMockMetadataResponse(t).
  996. SetController(seedBroker.BrokerID()).
  997. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  998. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  999. })
  1000. config := NewConfig()
  1001. config.Version = V1_0_0_0
  1002. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  1003. if err != nil {
  1004. t.Fatal(err)
  1005. }
  1006. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  1007. topic: {0},
  1008. })
  1009. if err != nil {
  1010. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  1011. }
  1012. block := response.GetBlock(topic, partition)
  1013. if block == nil {
  1014. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  1015. }
  1016. if block.Offset != expectedOffset {
  1017. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  1018. }
  1019. err = admin.Close()
  1020. if err != nil {
  1021. t.Fatal(err)
  1022. }
  1023. }
  1024. func TestDeleteConsumerGroup(t *testing.T) {
  1025. seedBroker := NewMockBroker(t, 1)
  1026. defer seedBroker.Close()
  1027. group := "my-group"
  1028. seedBroker.SetHandlerByMap(map[string]MockResponse{
  1029. // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  1030. "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
  1031. "MetadataRequest": NewMockMetadataResponse(t).
  1032. SetController(seedBroker.BrokerID()).
  1033. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  1034. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  1035. })
  1036. config := NewConfig()
  1037. config.Version = V1_1_0_0
  1038. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  1039. if err != nil {
  1040. t.Fatal(err)
  1041. }
  1042. err = admin.DeleteConsumerGroup(group)
  1043. if err != nil {
  1044. t.Fatalf("DeleteConsumerGroup failed with error %v", err)
  1045. }
  1046. }
  1047. // TestRefreshMetaDataWithDifferentController ensures that the cached
  1048. // controller can be forcibly updated from Metadata by the admin client
  1049. func TestRefreshMetaDataWithDifferentController(t *testing.T) {
  1050. seedBroker1 := NewMockBroker(t, 1)
  1051. seedBroker2 := NewMockBroker(t, 2)
  1052. defer seedBroker1.Close()
  1053. defer seedBroker2.Close()
  1054. seedBroker1.SetHandlerByMap(map[string]MockResponse{
  1055. "MetadataRequest": NewMockMetadataResponse(t).
  1056. SetController(seedBroker1.BrokerID()).
  1057. SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
  1058. SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
  1059. })
  1060. config := NewConfig()
  1061. config.Version = V1_1_0_0
  1062. client, err := NewClient([]string{seedBroker1.Addr()}, config)
  1063. if err != nil {
  1064. t.Fatal(err)
  1065. }
  1066. ca := clusterAdmin{client: client, conf: config}
  1067. if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
  1068. t.Fatalf("expected cached controller to be %d rather than %d",
  1069. seedBroker1.BrokerID(), b.ID())
  1070. }
  1071. seedBroker1.SetHandlerByMap(map[string]MockResponse{
  1072. "MetadataRequest": NewMockMetadataResponse(t).
  1073. SetController(seedBroker2.BrokerID()).
  1074. SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
  1075. SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
  1076. })
  1077. if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
  1078. t.Fatalf("expected refreshed controller to be %d rather than %d",
  1079. seedBroker2.BrokerID(), b.ID())
  1080. }
  1081. if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
  1082. t.Fatalf("expected cached controller to be %d rather than %d",
  1083. seedBroker2.BrokerID(), b.ID())
  1084. }
  1085. }