admin_test.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150
  1. package sarama
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. )
  7. func TestClusterAdmin(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. defer seedBroker.Close()
  10. seedBroker.SetHandlerByMap(map[string]MockResponse{
  11. "MetadataRequest": NewMockMetadataResponse(t).
  12. SetController(seedBroker.BrokerID()).
  13. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  14. })
  15. config := NewConfig()
  16. config.Version = V1_0_0_0
  17. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. err = admin.Close()
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. }
  26. func TestClusterAdminInvalidController(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. defer seedBroker.Close()
  29. seedBroker.SetHandlerByMap(map[string]MockResponse{
  30. "MetadataRequest": NewMockMetadataResponse(t).
  31. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  32. })
  33. config := NewConfig()
  34. config.Version = V1_0_0_0
  35. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  36. if err == nil {
  37. t.Fatal(errors.New("controller not set still cluster admin was created"))
  38. }
  39. if err != ErrControllerNotAvailable {
  40. t.Fatal(err)
  41. }
  42. }
  43. func TestClusterAdminCreateTopic(t *testing.T) {
  44. seedBroker := NewMockBroker(t, 1)
  45. defer seedBroker.Close()
  46. seedBroker.SetHandlerByMap(map[string]MockResponse{
  47. "MetadataRequest": NewMockMetadataResponse(t).
  48. SetController(seedBroker.BrokerID()).
  49. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  50. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  51. })
  52. config := NewConfig()
  53. config.Version = V0_10_2_0
  54. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.Close()
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  68. seedBroker := NewMockBroker(t, 1)
  69. defer seedBroker.Close()
  70. seedBroker.SetHandlerByMap(map[string]MockResponse{
  71. "MetadataRequest": NewMockMetadataResponse(t).
  72. SetController(seedBroker.BrokerID()).
  73. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  74. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  75. })
  76. config := NewConfig()
  77. config.Version = V0_10_2_0
  78. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. err = admin.CreateTopic("my_topic", nil, false)
  83. if err.Error() != "you must specify topic details" {
  84. t.Fatal(err)
  85. }
  86. err = admin.Close()
  87. if err != nil {
  88. t.Fatal(err)
  89. }
  90. }
  91. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  92. seedBroker := NewMockBroker(t, 1)
  93. defer seedBroker.Close()
  94. seedBroker.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetController(seedBroker.BrokerID()).
  97. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  98. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  99. })
  100. config := NewConfig()
  101. config.Version = V0_11_0_0
  102. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  107. want := "insufficient permissions to create topic with reserved prefix"
  108. if !strings.HasSuffix(err.Error(), want) {
  109. t.Fatal(err)
  110. }
  111. err = admin.Close()
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. }
  116. func TestClusterAdminListTopics(t *testing.T) {
  117. seedBroker := NewMockBroker(t, 1)
  118. defer seedBroker.Close()
  119. seedBroker.SetHandlerByMap(map[string]MockResponse{
  120. "MetadataRequest": NewMockMetadataResponse(t).
  121. SetController(seedBroker.BrokerID()).
  122. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  123. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  124. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  125. })
  126. config := NewConfig()
  127. config.Version = V1_1_0_0
  128. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  129. if err != nil {
  130. t.Fatal(err)
  131. }
  132. entries, err := admin.ListTopics()
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. if len(entries) <= 0 {
  137. t.Fatal(errors.New("no resource present"))
  138. }
  139. topic, found := entries["my_topic"]
  140. if !found {
  141. t.Fatal(errors.New("topic not found in response"))
  142. }
  143. _, found = topic.ConfigEntries["max.message.bytes"]
  144. if found {
  145. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  146. }
  147. value := topic.ConfigEntries["retention.ms"]
  148. if value == nil || *value != "5000" {
  149. t.Fatal(errors.New("non-default topic config entry not found in response"))
  150. }
  151. err = admin.Close()
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  156. t.Fatal(errors.New("replica assignment not found in response"))
  157. }
  158. }
  159. func TestClusterAdminDeleteTopic(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. defer seedBroker.Close()
  162. seedBroker.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetController(seedBroker.BrokerID()).
  165. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  166. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  167. })
  168. config := NewConfig()
  169. config.Version = V0_10_2_0
  170. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. err = admin.DeleteTopic("my_topic")
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.Close()
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. }
  183. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  184. seedBroker := NewMockBroker(t, 1)
  185. defer seedBroker.Close()
  186. seedBroker.SetHandlerByMap(map[string]MockResponse{
  187. "MetadataRequest": NewMockMetadataResponse(t).
  188. SetController(seedBroker.BrokerID()).
  189. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  190. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  191. })
  192. config := NewConfig()
  193. config.Version = V0_10_2_0
  194. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. err = admin.DeleteTopic("")
  199. if err != ErrInvalidTopic {
  200. t.Fatal(err)
  201. }
  202. err = admin.Close()
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. }
  207. func TestClusterAdminCreatePartitions(t *testing.T) {
  208. seedBroker := NewMockBroker(t, 1)
  209. defer seedBroker.Close()
  210. seedBroker.SetHandlerByMap(map[string]MockResponse{
  211. "MetadataRequest": NewMockMetadataResponse(t).
  212. SetController(seedBroker.BrokerID()).
  213. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  214. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  215. })
  216. config := NewConfig()
  217. config.Version = V1_0_0_0
  218. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  219. if err != nil {
  220. t.Fatal(err)
  221. }
  222. err = admin.CreatePartitions("my_topic", 3, nil, false)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.Close()
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. }
  231. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  232. seedBroker := NewMockBroker(t, 1)
  233. defer seedBroker.Close()
  234. seedBroker.SetHandlerByMap(map[string]MockResponse{
  235. "MetadataRequest": NewMockMetadataResponse(t).
  236. SetController(seedBroker.BrokerID()).
  237. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  238. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  239. })
  240. config := NewConfig()
  241. config.Version = V0_10_2_0
  242. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  243. if err != nil {
  244. t.Fatal(err)
  245. }
  246. err = admin.CreatePartitions("my_topic", 3, nil, false)
  247. if err != ErrUnsupportedVersion {
  248. t.Fatal(err)
  249. }
  250. err = admin.Close()
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. }
  255. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  256. seedBroker := NewMockBroker(t, 1)
  257. defer seedBroker.Close()
  258. seedBroker.SetHandlerByMap(map[string]MockResponse{
  259. "MetadataRequest": NewMockMetadataResponse(t).
  260. SetController(seedBroker.BrokerID()).
  261. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  262. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  263. })
  264. config := NewConfig()
  265. config.Version = V1_0_0_0
  266. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  267. if err != nil {
  268. t.Fatal(err)
  269. }
  270. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  271. want := "insufficient permissions to create partition on topic with reserved prefix"
  272. if !strings.HasSuffix(err.Error(), want) {
  273. t.Fatal(err)
  274. }
  275. err = admin.Close()
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. }
  280. func TestClusterAdminDeleteRecords(t *testing.T) {
  281. topicName := "my_topic"
  282. seedBroker := NewMockBroker(t, 1)
  283. defer seedBroker.Close()
  284. seedBroker.SetHandlerByMap(map[string]MockResponse{
  285. "MetadataRequest": NewMockMetadataResponse(t).
  286. SetController(seedBroker.BrokerID()).
  287. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  288. SetLeader(topicName, 1, 1).
  289. SetLeader(topicName, 2, 1).
  290. SetLeader(topicName, 3, 1),
  291. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  292. })
  293. config := NewConfig()
  294. config.Version = V1_0_0_0
  295. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. partitionOffsetFake := make(map[int32]int64)
  300. partitionOffsetFake[4] = 1000
  301. errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
  302. if errFake == nil {
  303. t.Fatal(err)
  304. }
  305. partitionOffset := make(map[int32]int64)
  306. partitionOffset[1] = 1000
  307. partitionOffset[2] = 1000
  308. partitionOffset[3] = 1000
  309. err = admin.DeleteRecords(topicName, partitionOffset)
  310. if err != nil {
  311. t.Fatal(err)
  312. }
  313. err = admin.Close()
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. }
  318. func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
  319. topicName := "my_topic"
  320. seedBroker := NewMockBroker(t, 1)
  321. secondBroker := NewMockBroker(t, 2)
  322. defer seedBroker.Close()
  323. defer secondBroker.Close()
  324. seedBroker.SetHandlerByMap(map[string]MockResponse{
  325. "MetadataRequest": NewMockMetadataResponse(t).
  326. SetController(seedBroker.BrokerID()).
  327. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  328. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  329. SetLeader(topicName, 1, 1).
  330. SetLeader(topicName, 2, 1).
  331. SetLeader(topicName, 3, 2),
  332. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  333. })
  334. secondBroker.SetHandlerByMap(map[string]MockResponse{
  335. "MetadataRequest": NewMockMetadataResponse(t).
  336. SetController(seedBroker.BrokerID()).
  337. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  338. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  339. SetLeader(topicName, 1, 1).
  340. SetLeader(topicName, 2, 1).
  341. SetLeader(topicName, 3, 2),
  342. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  343. })
  344. config := NewConfig()
  345. config.Version = V1_0_0_0
  346. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  347. if err != nil {
  348. t.Fatal(err)
  349. }
  350. partitionOffset := make(map[int32]int64)
  351. partitionOffset[1] = 1000
  352. partitionOffset[2] = 1000
  353. partitionOffset[3] = 1000
  354. err = admin.DeleteRecords(topicName, partitionOffset)
  355. if err != nil {
  356. t.Fatal(err)
  357. }
  358. err = admin.Close()
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. }
  363. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  364. topicName := "my_topic"
  365. seedBroker := NewMockBroker(t, 1)
  366. defer seedBroker.Close()
  367. seedBroker.SetHandlerByMap(map[string]MockResponse{
  368. "MetadataRequest": NewMockMetadataResponse(t).
  369. SetController(seedBroker.BrokerID()).
  370. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  371. SetLeader(topicName, 1, 1).
  372. SetLeader(topicName, 2, 1).
  373. SetLeader(topicName, 3, 1),
  374. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  375. })
  376. config := NewConfig()
  377. config.Version = V0_10_2_0
  378. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  379. if err != nil {
  380. t.Fatal(err)
  381. }
  382. partitionOffset := make(map[int32]int64)
  383. partitionOffset[1] = 1000
  384. partitionOffset[2] = 1000
  385. partitionOffset[3] = 1000
  386. err = admin.DeleteRecords(topicName, partitionOffset)
  387. if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
  388. t.Fatal(err)
  389. }
  390. deleteRecordsError, ok := err.(ErrDeleteRecords)
  391. if !ok {
  392. t.Fatal(err)
  393. }
  394. for _, err := range *deleteRecordsError.Errors {
  395. if err != ErrUnsupportedVersion {
  396. t.Fatal(err)
  397. }
  398. }
  399. err = admin.Close()
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. }
  404. func TestClusterAdminDescribeConfig(t *testing.T) {
  405. seedBroker := NewMockBroker(t, 1)
  406. defer seedBroker.Close()
  407. seedBroker.SetHandlerByMap(map[string]MockResponse{
  408. "MetadataRequest": NewMockMetadataResponse(t).
  409. SetController(seedBroker.BrokerID()).
  410. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  411. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  412. })
  413. var tests = []struct {
  414. saramaVersion KafkaVersion
  415. requestVersion int16
  416. includeSynonyms bool
  417. }{
  418. {V1_0_0_0, 0, false},
  419. {V1_1_0_0, 1, true},
  420. {V1_1_1_0, 1, true},
  421. {V2_0_0_0, 2, true},
  422. }
  423. for _, tt := range tests {
  424. config := NewConfig()
  425. config.Version = tt.saramaVersion
  426. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. defer func() {
  431. _ = admin.Close()
  432. }()
  433. resource := ConfigResource{
  434. Name: "r1",
  435. Type: TopicResource,
  436. ConfigNames: []string{"my_topic"},
  437. }
  438. entries, err := admin.DescribeConfig(resource)
  439. if err != nil {
  440. t.Fatal(err)
  441. }
  442. history := seedBroker.History()
  443. describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
  444. if !ok {
  445. t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
  446. }
  447. if describeReq.Version != tt.requestVersion {
  448. t.Fatalf(
  449. "requestVersion %v did not match expected %v",
  450. describeReq.Version, tt.requestVersion)
  451. }
  452. if len(entries) <= 0 {
  453. t.Fatal(errors.New("no resource present"))
  454. }
  455. if tt.includeSynonyms {
  456. if len(entries[0].Synonyms) == 0 {
  457. t.Fatal("expected synonyms to have been included")
  458. }
  459. }
  460. }
  461. }
  462. // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
  463. // is sent to the broker in the resource struct, _not_ the controller
  464. func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
  465. controllerBroker := NewMockBroker(t, 1)
  466. defer controllerBroker.Close()
  467. configBroker := NewMockBroker(t, 2)
  468. defer configBroker.Close()
  469. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  470. "MetadataRequest": NewMockMetadataResponse(t).
  471. SetController(controllerBroker.BrokerID()).
  472. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  473. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  474. })
  475. configBroker.SetHandlerByMap(map[string]MockResponse{
  476. "MetadataRequest": NewMockMetadataResponse(t).
  477. SetController(controllerBroker.BrokerID()).
  478. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  479. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  480. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  481. })
  482. config := NewConfig()
  483. config.Version = V1_0_0_0
  484. admin, err := NewClusterAdmin(
  485. []string{
  486. controllerBroker.Addr(),
  487. configBroker.Addr(),
  488. }, config)
  489. if err != nil {
  490. t.Fatal(err)
  491. }
  492. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  493. resource := ConfigResource{Name: "2", Type: resourceType}
  494. entries, err := admin.DescribeConfig(resource)
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. if len(entries) <= 0 {
  499. t.Fatal(errors.New("no resource present"))
  500. }
  501. }
  502. err = admin.Close()
  503. if err != nil {
  504. t.Fatal(err)
  505. }
  506. }
  507. func TestClusterAdminAlterConfig(t *testing.T) {
  508. seedBroker := NewMockBroker(t, 1)
  509. defer seedBroker.Close()
  510. seedBroker.SetHandlerByMap(map[string]MockResponse{
  511. "MetadataRequest": NewMockMetadataResponse(t).
  512. SetController(seedBroker.BrokerID()).
  513. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  514. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  515. })
  516. config := NewConfig()
  517. config.Version = V1_0_0_0
  518. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. var value string
  523. entries := make(map[string]*string)
  524. value = "60000"
  525. entries["retention.ms"] = &value
  526. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. err = admin.Close()
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. }
  535. func TestClusterAdminAlterBrokerConfig(t *testing.T) {
  536. controllerBroker := NewMockBroker(t, 1)
  537. defer controllerBroker.Close()
  538. configBroker := NewMockBroker(t, 2)
  539. defer configBroker.Close()
  540. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  541. "MetadataRequest": NewMockMetadataResponse(t).
  542. SetController(controllerBroker.BrokerID()).
  543. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  544. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  545. })
  546. configBroker.SetHandlerByMap(map[string]MockResponse{
  547. "MetadataRequest": NewMockMetadataResponse(t).
  548. SetController(controllerBroker.BrokerID()).
  549. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  550. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  551. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  552. })
  553. config := NewConfig()
  554. config.Version = V1_0_0_0
  555. admin, err := NewClusterAdmin(
  556. []string{
  557. controllerBroker.Addr(),
  558. configBroker.Addr(),
  559. }, config)
  560. if err != nil {
  561. t.Fatal(err)
  562. }
  563. var value string
  564. entries := make(map[string]*string)
  565. value = "3"
  566. entries["min.insync.replicas"] = &value
  567. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  568. resource := ConfigResource{Name: "2", Type: resourceType}
  569. err = admin.AlterConfig(
  570. resource.Type,
  571. resource.Name,
  572. entries,
  573. false)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. }
  578. err = admin.Close()
  579. if err != nil {
  580. t.Fatal(err)
  581. }
  582. }
  583. func TestClusterAdminCreateAcl(t *testing.T) {
  584. seedBroker := NewMockBroker(t, 1)
  585. defer seedBroker.Close()
  586. seedBroker.SetHandlerByMap(map[string]MockResponse{
  587. "MetadataRequest": NewMockMetadataResponse(t).
  588. SetController(seedBroker.BrokerID()).
  589. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  590. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  591. })
  592. config := NewConfig()
  593. config.Version = V1_0_0_0
  594. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  595. if err != nil {
  596. t.Fatal(err)
  597. }
  598. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  599. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  600. err = admin.CreateACL(r, a)
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. err = admin.Close()
  605. if err != nil {
  606. t.Fatal(err)
  607. }
  608. }
  609. func TestClusterAdminListAcls(t *testing.T) {
  610. seedBroker := NewMockBroker(t, 1)
  611. defer seedBroker.Close()
  612. seedBroker.SetHandlerByMap(map[string]MockResponse{
  613. "MetadataRequest": NewMockMetadataResponse(t).
  614. SetController(seedBroker.BrokerID()).
  615. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  616. "DescribeAclsRequest": NewMockListAclsResponse(t),
  617. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  618. })
  619. config := NewConfig()
  620. config.Version = V1_0_0_0
  621. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  622. if err != nil {
  623. t.Fatal(err)
  624. }
  625. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  626. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  627. err = admin.CreateACL(r, a)
  628. if err != nil {
  629. t.Fatal(err)
  630. }
  631. resourceName := "my_topic"
  632. filter := AclFilter{
  633. ResourceType: AclResourceTopic,
  634. Operation: AclOperationRead,
  635. ResourceName: &resourceName,
  636. }
  637. rAcls, err := admin.ListAcls(filter)
  638. if err != nil {
  639. t.Fatal(err)
  640. }
  641. if len(rAcls) <= 0 {
  642. t.Fatal("no acls present")
  643. }
  644. err = admin.Close()
  645. if err != nil {
  646. t.Fatal(err)
  647. }
  648. }
  649. func TestClusterAdminDeleteAcl(t *testing.T) {
  650. seedBroker := NewMockBroker(t, 1)
  651. defer seedBroker.Close()
  652. seedBroker.SetHandlerByMap(map[string]MockResponse{
  653. "MetadataRequest": NewMockMetadataResponse(t).
  654. SetController(seedBroker.BrokerID()).
  655. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  656. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  657. })
  658. config := NewConfig()
  659. config.Version = V1_0_0_0
  660. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  661. if err != nil {
  662. t.Fatal(err)
  663. }
  664. resourceName := "my_topic"
  665. filter := AclFilter{
  666. ResourceType: AclResourceTopic,
  667. Operation: AclOperationAlter,
  668. ResourceName: &resourceName,
  669. }
  670. _, err = admin.DeleteACL(filter, false)
  671. if err != nil {
  672. t.Fatal(err)
  673. }
  674. err = admin.Close()
  675. if err != nil {
  676. t.Fatal(err)
  677. }
  678. }
  679. func TestDescribeTopic(t *testing.T) {
  680. seedBroker := NewMockBroker(t, 1)
  681. defer seedBroker.Close()
  682. seedBroker.SetHandlerByMap(map[string]MockResponse{
  683. "MetadataRequest": NewMockMetadataResponse(t).
  684. SetController(seedBroker.BrokerID()).
  685. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  686. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  687. })
  688. config := NewConfig()
  689. config.Version = V1_0_0_0
  690. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  691. if err != nil {
  692. t.Fatal(err)
  693. }
  694. topics, err := admin.DescribeTopics([]string{"my_topic"})
  695. if err != nil {
  696. t.Fatal(err)
  697. }
  698. if len(topics) != 1 {
  699. t.Fatalf("Expected 1 result, got %v", len(topics))
  700. }
  701. if topics[0].Name != "my_topic" {
  702. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  703. }
  704. err = admin.Close()
  705. if err != nil {
  706. t.Fatal(err)
  707. }
  708. }
  709. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  710. seedBroker := NewMockBroker(t, 1)
  711. defer seedBroker.Close()
  712. seedBroker.SetHandlerByMap(map[string]MockResponse{
  713. "MetadataRequest": NewMockMetadataResponse(t).
  714. SetController(seedBroker.BrokerID()).
  715. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  716. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  717. })
  718. config := NewConfig()
  719. config.Version = V0_11_0_0
  720. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  721. if err != nil {
  722. t.Fatal(err)
  723. }
  724. topics, err := admin.DescribeTopics([]string{"my_topic"})
  725. if err != nil {
  726. t.Fatal(err)
  727. }
  728. if len(topics) != 1 {
  729. t.Fatalf("Expected 1 result, got %v", len(topics))
  730. }
  731. if topics[0].Name != "my_topic" {
  732. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  733. }
  734. err = admin.Close()
  735. if err != nil {
  736. t.Fatal(err)
  737. }
  738. }
  739. func TestDescribeConsumerGroup(t *testing.T) {
  740. seedBroker := NewMockBroker(t, 1)
  741. defer seedBroker.Close()
  742. expectedGroupID := "my-group"
  743. seedBroker.SetHandlerByMap(map[string]MockResponse{
  744. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  745. GroupId: expectedGroupID,
  746. }),
  747. "MetadataRequest": NewMockMetadataResponse(t).
  748. SetController(seedBroker.BrokerID()).
  749. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  750. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  751. })
  752. config := NewConfig()
  753. config.Version = V1_0_0_0
  754. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  759. if err != nil {
  760. t.Fatal(err)
  761. }
  762. if len(result) != 1 {
  763. t.Fatalf("Expected 1 result, got %v", len(result))
  764. }
  765. if result[0].GroupId != expectedGroupID {
  766. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  767. }
  768. err = admin.Close()
  769. if err != nil {
  770. t.Fatal(err)
  771. }
  772. }
  773. func TestListConsumerGroups(t *testing.T) {
  774. seedBroker := NewMockBroker(t, 1)
  775. defer seedBroker.Close()
  776. seedBroker.SetHandlerByMap(map[string]MockResponse{
  777. "MetadataRequest": NewMockMetadataResponse(t).
  778. SetController(seedBroker.BrokerID()).
  779. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  780. "ListGroupsRequest": NewMockListGroupsResponse(t).
  781. AddGroup("my-group", "consumer"),
  782. })
  783. config := NewConfig()
  784. config.Version = V1_0_0_0
  785. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  786. if err != nil {
  787. t.Fatal(err)
  788. }
  789. groups, err := admin.ListConsumerGroups()
  790. if err != nil {
  791. t.Fatal(err)
  792. }
  793. if len(groups) != 1 {
  794. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  795. }
  796. protocolType, ok := groups["my-group"]
  797. if !ok {
  798. t.Fatal("Expected group to be returned, but it did not")
  799. }
  800. if protocolType != "consumer" {
  801. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  802. }
  803. err = admin.Close()
  804. if err != nil {
  805. t.Fatal(err)
  806. }
  807. }
  808. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  809. seedBroker := NewMockBroker(t, 1)
  810. defer seedBroker.Close()
  811. secondBroker := NewMockBroker(t, 2)
  812. defer secondBroker.Close()
  813. firstGroup := "first"
  814. secondGroup := "second"
  815. nonExistingGroup := "non-existing-group"
  816. seedBroker.SetHandlerByMap(map[string]MockResponse{
  817. "MetadataRequest": NewMockMetadataResponse(t).
  818. SetController(seedBroker.BrokerID()).
  819. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  820. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  821. "ListGroupsRequest": NewMockListGroupsResponse(t).
  822. AddGroup(firstGroup, "consumer"),
  823. })
  824. secondBroker.SetHandlerByMap(map[string]MockResponse{
  825. "MetadataRequest": NewMockMetadataResponse(t).
  826. SetController(seedBroker.BrokerID()).
  827. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  828. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  829. "ListGroupsRequest": NewMockListGroupsResponse(t).
  830. AddGroup(secondGroup, "consumer"),
  831. })
  832. config := NewConfig()
  833. config.Version = V1_0_0_0
  834. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  835. if err != nil {
  836. t.Fatal(err)
  837. }
  838. groups, err := admin.ListConsumerGroups()
  839. if err != nil {
  840. t.Fatal(err)
  841. }
  842. if len(groups) != 2 {
  843. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  844. }
  845. if _, found := groups[firstGroup]; !found {
  846. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  847. }
  848. if _, found := groups[secondGroup]; !found {
  849. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  850. }
  851. if _, found := groups[nonExistingGroup]; found {
  852. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  853. }
  854. err = admin.Close()
  855. if err != nil {
  856. t.Fatal(err)
  857. }
  858. }
  859. func TestListConsumerGroupOffsets(t *testing.T) {
  860. seedBroker := NewMockBroker(t, 1)
  861. defer seedBroker.Close()
  862. group := "my-group"
  863. topic := "my-topic"
  864. partition := int32(0)
  865. expectedOffset := int64(0)
  866. seedBroker.SetHandlerByMap(map[string]MockResponse{
  867. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
  868. "MetadataRequest": NewMockMetadataResponse(t).
  869. SetController(seedBroker.BrokerID()).
  870. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  871. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  872. })
  873. config := NewConfig()
  874. config.Version = V1_0_0_0
  875. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  876. if err != nil {
  877. t.Fatal(err)
  878. }
  879. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  880. topic: {0},
  881. })
  882. if err != nil {
  883. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  884. }
  885. block := response.GetBlock(topic, partition)
  886. if block == nil {
  887. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  888. }
  889. if block.Offset != expectedOffset {
  890. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  891. }
  892. err = admin.Close()
  893. if err != nil {
  894. t.Fatal(err)
  895. }
  896. }
  897. func TestDeleteConsumerGroup(t *testing.T) {
  898. seedBroker := NewMockBroker(t, 1)
  899. defer seedBroker.Close()
  900. group := "my-group"
  901. seedBroker.SetHandlerByMap(map[string]MockResponse{
  902. // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  903. "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
  904. "MetadataRequest": NewMockMetadataResponse(t).
  905. SetController(seedBroker.BrokerID()).
  906. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  907. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  908. })
  909. config := NewConfig()
  910. config.Version = V1_1_0_0
  911. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  912. if err != nil {
  913. t.Fatal(err)
  914. }
  915. err = admin.DeleteConsumerGroup(group)
  916. if err != nil {
  917. t.Fatalf("DeleteConsumerGroup failed with error %v", err)
  918. }
  919. }
  920. // TestRefreshMetaDataWithDifferentController ensures that the cached
  921. // controller can be forcibly updated from Metadata by the admin client
  922. func TestRefreshMetaDataWithDifferentController(t *testing.T) {
  923. seedBroker1 := NewMockBroker(t, 1)
  924. seedBroker2 := NewMockBroker(t, 2)
  925. defer seedBroker1.Close()
  926. defer seedBroker2.Close()
  927. seedBroker1.SetHandlerByMap(map[string]MockResponse{
  928. "MetadataRequest": NewMockMetadataResponse(t).
  929. SetController(seedBroker1.BrokerID()).
  930. SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
  931. SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
  932. })
  933. config := NewConfig()
  934. config.Version = V1_1_0_0
  935. client, err := NewClient([]string{seedBroker1.Addr()}, config)
  936. if err != nil {
  937. t.Fatal(err)
  938. }
  939. ca := clusterAdmin{client: client, conf: config}
  940. if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
  941. t.Fatalf("expected cached controller to be %d rather than %d",
  942. seedBroker1.BrokerID(), b.ID())
  943. }
  944. seedBroker1.SetHandlerByMap(map[string]MockResponse{
  945. "MetadataRequest": NewMockMetadataResponse(t).
  946. SetController(seedBroker2.BrokerID()).
  947. SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
  948. SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
  949. })
  950. if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
  951. t.Fatalf("expected refreshed controller to be %d rather than %d",
  952. seedBroker2.BrokerID(), b.ID())
  953. }
  954. if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
  955. t.Fatalf("expected cached controller to be %d rather than %d",
  956. seedBroker2.BrokerID(), b.ID())
  957. }
  958. }