admin_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "strings"
  9. "testing"
  10. )
  11. func TestClusterAdmin(t *testing.T) {
  12. seedBroker := NewMockBroker(t, 1)
  13. defer seedBroker.Close()
  14. seedBroker.SetHandlerByMap(map[string]MockResponse{
  15. "MetadataRequest": NewMockMetadataResponse(t).
  16. SetController(seedBroker.BrokerID()).
  17. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  18. })
  19. config := NewConfig()
  20. config.Version = V1_0_0_0
  21. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. err = admin.Close()
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. }
  30. func TestClusterAdminInvalidController(t *testing.T) {
  31. seedBroker := NewMockBroker(t, 1)
  32. defer seedBroker.Close()
  33. seedBroker.SetHandlerByMap(map[string]MockResponse{
  34. "MetadataRequest": NewMockMetadataResponse(t).
  35. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  36. })
  37. config := NewConfig()
  38. config.Version = V1_0_0_0
  39. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  40. if err == nil {
  41. t.Fatal(errors.New("controller not set still cluster admin was created"))
  42. }
  43. if err != ErrControllerNotAvailable {
  44. t.Fatal(err)
  45. }
  46. }
  47. func TestClusterAdminCreateTopic(t *testing.T) {
  48. seedBroker := NewMockBroker(t, 1)
  49. defer seedBroker.Close()
  50. seedBroker.SetHandlerByMap(map[string]MockResponse{
  51. "MetadataRequest": NewMockMetadataResponse(t).
  52. SetController(seedBroker.BrokerID()).
  53. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  54. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  55. })
  56. config := NewConfig()
  57. config.Version = V0_10_2_0
  58. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. err = admin.Close()
  67. if err != nil {
  68. t.Fatal(err)
  69. }
  70. }
  71. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  72. seedBroker := NewMockBroker(t, 1)
  73. defer seedBroker.Close()
  74. seedBroker.SetHandlerByMap(map[string]MockResponse{
  75. "MetadataRequest": NewMockMetadataResponse(t).
  76. SetController(seedBroker.BrokerID()).
  77. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  78. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  79. })
  80. config := NewConfig()
  81. config.Version = V0_10_2_0
  82. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. err = admin.CreateTopic("my_topic", nil, false)
  87. if err.Error() != "you must specify topic details" {
  88. t.Fatal(err)
  89. }
  90. err = admin.Close()
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. }
  95. func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
  96. seedBroker := NewMockBroker(t, 1)
  97. defer seedBroker.Close()
  98. seedBroker.SetHandlerByMap(map[string]MockResponse{
  99. "MetadataRequest": NewMockMetadataResponse(t).
  100. SetController(seedBroker.BrokerID()).
  101. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  102. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  103. })
  104. config := NewConfig()
  105. config.Version = V0_11_0_0
  106. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  111. want := "insufficient permissions to create topic with reserved prefix"
  112. if !strings.HasSuffix(err.Error(), want) {
  113. t.Fatal(err)
  114. }
  115. err = admin.Close()
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. }
  120. func TestClusterAdminListTopics(t *testing.T) {
  121. seedBroker := NewMockBroker(t, 1)
  122. defer seedBroker.Close()
  123. seedBroker.SetHandlerByMap(map[string]MockResponse{
  124. "MetadataRequest": NewMockMetadataResponse(t).
  125. SetController(seedBroker.BrokerID()).
  126. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  127. SetLeader("my_topic", 0, seedBroker.BrokerID()),
  128. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  129. })
  130. config := NewConfig()
  131. config.Version = V1_0_0_0
  132. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. entries, err := admin.ListTopics()
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. if len(entries) <= 0 {
  141. t.Fatal(errors.New("no resource present"))
  142. }
  143. topic, found := entries["my_topic"]
  144. if !found {
  145. t.Fatal(errors.New("topic not found in response"))
  146. }
  147. _, found = topic.ConfigEntries["max.message.bytes"]
  148. if found {
  149. t.Fatal(errors.New("default topic config entry incorrectly found in response"))
  150. }
  151. value, _ := topic.ConfigEntries["retention.ms"]
  152. if value == nil || *value != "5000" {
  153. t.Fatal(errors.New("non-default topic config entry not found in response"))
  154. }
  155. err = admin.Close()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
  160. t.Fatal(errors.New("replica assignment not found in response"))
  161. }
  162. }
  163. func TestClusterAdminDeleteTopic(t *testing.T) {
  164. seedBroker := NewMockBroker(t, 1)
  165. defer seedBroker.Close()
  166. seedBroker.SetHandlerByMap(map[string]MockResponse{
  167. "MetadataRequest": NewMockMetadataResponse(t).
  168. SetController(seedBroker.BrokerID()).
  169. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  170. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  171. })
  172. config := NewConfig()
  173. config.Version = V0_10_2_0
  174. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. err = admin.DeleteTopic("my_topic")
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. err = admin.Close()
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. }
  187. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  188. seedBroker := NewMockBroker(t, 1)
  189. defer seedBroker.Close()
  190. seedBroker.SetHandlerByMap(map[string]MockResponse{
  191. "MetadataRequest": NewMockMetadataResponse(t).
  192. SetController(seedBroker.BrokerID()).
  193. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  194. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  195. })
  196. config := NewConfig()
  197. config.Version = V0_10_2_0
  198. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. err = admin.DeleteTopic("")
  203. if err != ErrInvalidTopic {
  204. t.Fatal(err)
  205. }
  206. err = admin.Close()
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. }
  211. func TestClusterAdminCreatePartitions(t *testing.T) {
  212. seedBroker := NewMockBroker(t, 1)
  213. defer seedBroker.Close()
  214. seedBroker.SetHandlerByMap(map[string]MockResponse{
  215. "MetadataRequest": NewMockMetadataResponse(t).
  216. SetController(seedBroker.BrokerID()).
  217. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  218. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  219. })
  220. config := NewConfig()
  221. config.Version = V1_0_0_0
  222. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. err = admin.CreatePartitions("my_topic", 3, nil, false)
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. err = admin.Close()
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. }
  235. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  236. seedBroker := NewMockBroker(t, 1)
  237. defer seedBroker.Close()
  238. seedBroker.SetHandlerByMap(map[string]MockResponse{
  239. "MetadataRequest": NewMockMetadataResponse(t).
  240. SetController(seedBroker.BrokerID()).
  241. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  242. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  243. })
  244. config := NewConfig()
  245. config.Version = V0_10_2_0
  246. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. err = admin.CreatePartitions("my_topic", 3, nil, false)
  251. if err != ErrUnsupportedVersion {
  252. t.Fatal(err)
  253. }
  254. err = admin.Close()
  255. if err != nil {
  256. t.Fatal(err)
  257. }
  258. }
  259. func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
  260. seedBroker := NewMockBroker(t, 1)
  261. defer seedBroker.Close()
  262. seedBroker.SetHandlerByMap(map[string]MockResponse{
  263. "MetadataRequest": NewMockMetadataResponse(t).
  264. SetController(seedBroker.BrokerID()).
  265. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  266. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  267. })
  268. config := NewConfig()
  269. config.Version = V1_0_0_0
  270. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  271. if err != nil {
  272. t.Fatal(err)
  273. }
  274. err = admin.CreatePartitions("_internal_topic", 3, nil, false)
  275. want := "insufficient permissions to create partition on topic with reserved prefix"
  276. if !strings.HasSuffix(err.Error(), want) {
  277. t.Fatal(err)
  278. }
  279. err = admin.Close()
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. }
  284. func TestClusterAdminDeleteRecords(t *testing.T) {
  285. topicName := "my_topic"
  286. seedBroker := NewMockBroker(t, 1)
  287. defer seedBroker.Close()
  288. seedBroker.SetHandlerByMap(map[string]MockResponse{
  289. "MetadataRequest": NewMockMetadataResponse(t).
  290. SetController(seedBroker.BrokerID()).
  291. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  292. SetLeader(topicName, 1, 1).
  293. SetLeader(topicName, 2, 1).
  294. SetLeader(topicName, 3, 1),
  295. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  296. })
  297. config := NewConfig()
  298. config.Version = V1_0_0_0
  299. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. partitionOffsetFake := make(map[int32]int64)
  304. partitionOffsetFake[4] = 1000
  305. errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
  306. if errFake == nil {
  307. t.Fatal(err)
  308. }
  309. partitionOffset := make(map[int32]int64)
  310. partitionOffset[1] = 1000
  311. partitionOffset[2] = 1000
  312. partitionOffset[3] = 1000
  313. err = admin.DeleteRecords(topicName, partitionOffset)
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. err = admin.Close()
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. }
  322. func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
  323. topicName := "my_topic"
  324. seedBroker := NewMockBroker(t, 1)
  325. secondBroker := NewMockBroker(t, 2)
  326. defer seedBroker.Close()
  327. defer secondBroker.Close()
  328. seedBroker.SetHandlerByMap(map[string]MockResponse{
  329. "MetadataRequest": NewMockMetadataResponse(t).
  330. SetController(seedBroker.BrokerID()).
  331. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  332. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  333. SetLeader(topicName, 1, 1).
  334. SetLeader(topicName, 2, 1).
  335. SetLeader(topicName, 3, 2),
  336. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  337. })
  338. secondBroker.SetHandlerByMap(map[string]MockResponse{
  339. "MetadataRequest": NewMockMetadataResponse(t).
  340. SetController(seedBroker.BrokerID()).
  341. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  342. SetBroker(secondBroker.Addr(), secondBroker.brokerID).
  343. SetLeader(topicName, 1, 1).
  344. SetLeader(topicName, 2, 1).
  345. SetLeader(topicName, 3, 2),
  346. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  347. })
  348. config := NewConfig()
  349. config.Version = V1_0_0_0
  350. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  351. if err != nil {
  352. t.Fatal(err)
  353. }
  354. partitionOffset := make(map[int32]int64)
  355. partitionOffset[1] = 1000
  356. partitionOffset[2] = 1000
  357. partitionOffset[3] = 1000
  358. err = admin.DeleteRecords(topicName, partitionOffset)
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. err = admin.Close()
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. }
  367. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  368. topicName := "my_topic"
  369. seedBroker := NewMockBroker(t, 1)
  370. defer seedBroker.Close()
  371. seedBroker.SetHandlerByMap(map[string]MockResponse{
  372. "MetadataRequest": NewMockMetadataResponse(t).
  373. SetController(seedBroker.BrokerID()).
  374. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  375. SetLeader(topicName, 1, 1).
  376. SetLeader(topicName, 2, 1).
  377. SetLeader(topicName, 3, 1),
  378. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  379. })
  380. config := NewConfig()
  381. config.Version = V0_10_2_0
  382. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. partitionOffset := make(map[int32]int64)
  387. partitionOffset[1] = 1000
  388. partitionOffset[2] = 1000
  389. partitionOffset[3] = 1000
  390. err = admin.DeleteRecords(topicName, partitionOffset)
  391. if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
  392. t.Fatal(err)
  393. }
  394. deleteRecordsError, ok := err.(ErrDeleteRecords)
  395. if !ok {
  396. t.Fatal(err)
  397. }
  398. for _, err := range *deleteRecordsError.Errors {
  399. if err != ErrUnsupportedVersion {
  400. t.Fatal(err)
  401. }
  402. }
  403. err = admin.Close()
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. }
  408. func TestClusterAdminDescribeConfig(t *testing.T) {
  409. seedBroker := NewMockBroker(t, 1)
  410. defer seedBroker.Close()
  411. seedBroker.SetHandlerByMap(map[string]MockResponse{
  412. "MetadataRequest": NewMockMetadataResponse(t).
  413. SetController(seedBroker.BrokerID()).
  414. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  415. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  416. })
  417. var tests = []struct {
  418. saramaVersion KafkaVersion
  419. requestVersion int16
  420. includeSynonyms bool
  421. }{
  422. {V1_0_0_0, 0, false},
  423. {V1_1_0_0, 1, true},
  424. {V1_1_1_0, 1, true},
  425. {V2_0_0_0, 2, true},
  426. }
  427. for _, tt := range tests {
  428. config := NewConfig()
  429. config.Version = tt.saramaVersion
  430. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  431. if err != nil {
  432. t.Fatal(err)
  433. }
  434. defer func() {
  435. _ = admin.Close()
  436. }()
  437. resource := ConfigResource{
  438. Name: "r1",
  439. Type: TopicResource,
  440. ConfigNames: []string{"my_topic"},
  441. }
  442. entries, err := admin.DescribeConfig(resource)
  443. if err != nil {
  444. t.Fatal(err)
  445. }
  446. history := seedBroker.History()
  447. describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
  448. if !ok {
  449. t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
  450. }
  451. if describeReq.Version != tt.requestVersion {
  452. t.Fatalf(
  453. "requestVersion %v did not match expected %v",
  454. describeReq.Version, tt.requestVersion)
  455. }
  456. if len(entries) <= 0 {
  457. t.Fatal(errors.New("no resource present"))
  458. }
  459. if tt.includeSynonyms {
  460. if len(entries[0].Synonyms) == 0 {
  461. t.Fatal("expected synonyms to have been included")
  462. }
  463. }
  464. }
  465. }
  466. // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
  467. // is sent to the broker in the resource struct, _not_ the controller
  468. func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
  469. Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
  470. defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()
  471. controllerBroker := NewMockBroker(t, 1)
  472. defer controllerBroker.Close()
  473. configBroker := NewMockBroker(t, 2)
  474. defer configBroker.Close()
  475. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  476. "MetadataRequest": NewMockMetadataResponse(t).
  477. SetController(controllerBroker.BrokerID()).
  478. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  479. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  480. })
  481. configBroker.SetHandlerByMap(map[string]MockResponse{
  482. "MetadataRequest": NewMockMetadataResponse(t).
  483. SetController(controllerBroker.BrokerID()).
  484. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  485. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  486. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  487. })
  488. config := NewConfig()
  489. config.Version = V1_0_0_0
  490. admin, err := NewClusterAdmin(
  491. []string{
  492. controllerBroker.Addr(),
  493. configBroker.Addr(),
  494. }, config)
  495. if err != nil {
  496. t.Fatal(err)
  497. }
  498. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  499. resource := ConfigResource{Name: "2", Type: resourceType}
  500. entries, err := admin.DescribeConfig(resource)
  501. if err != nil {
  502. t.Fatal(err)
  503. }
  504. if len(entries) <= 0 {
  505. t.Fatal(errors.New("no resource present"))
  506. }
  507. }
  508. err = admin.Close()
  509. if err != nil {
  510. t.Fatal(err)
  511. }
  512. }
  513. func TestClusterAdminAlterConfig(t *testing.T) {
  514. seedBroker := NewMockBroker(t, 1)
  515. defer seedBroker.Close()
  516. seedBroker.SetHandlerByMap(map[string]MockResponse{
  517. "MetadataRequest": NewMockMetadataResponse(t).
  518. SetController(seedBroker.BrokerID()).
  519. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  520. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  521. })
  522. config := NewConfig()
  523. config.Version = V1_0_0_0
  524. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  525. if err != nil {
  526. t.Fatal(err)
  527. }
  528. var value string
  529. entries := make(map[string]*string)
  530. value = "3"
  531. entries["ReplicationFactor"] = &value
  532. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  533. if err != nil {
  534. t.Fatal(err)
  535. }
  536. err = admin.Close()
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. }
  541. func TestClusterAdminAlterBrokerConfig(t *testing.T) {
  542. controllerBroker := NewMockBroker(t, 1)
  543. defer controllerBroker.Close()
  544. configBroker := NewMockBroker(t, 2)
  545. defer configBroker.Close()
  546. controllerBroker.SetHandlerByMap(map[string]MockResponse{
  547. "MetadataRequest": NewMockMetadataResponse(t).
  548. SetController(controllerBroker.BrokerID()).
  549. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  550. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  551. })
  552. configBroker.SetHandlerByMap(map[string]MockResponse{
  553. "MetadataRequest": NewMockMetadataResponse(t).
  554. SetController(controllerBroker.BrokerID()).
  555. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
  556. SetBroker(configBroker.Addr(), configBroker.BrokerID()),
  557. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  558. })
  559. config := NewConfig()
  560. config.Version = V1_0_0_0
  561. admin, err := NewClusterAdmin(
  562. []string{
  563. controllerBroker.Addr(),
  564. configBroker.Addr(),
  565. }, config)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. var value string
  570. entries := make(map[string]*string)
  571. value = "3"
  572. entries["min.insync.replicas"] = &value
  573. for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
  574. resource := ConfigResource{Name: "2", Type: resourceType}
  575. err = admin.AlterConfig(
  576. resource.Type,
  577. resource.Name,
  578. entries,
  579. false)
  580. if err != nil {
  581. t.Fatal(err)
  582. }
  583. }
  584. err = admin.Close()
  585. if err != nil {
  586. t.Fatal(err)
  587. }
  588. }
  589. func TestClusterAdminCreateAcl(t *testing.T) {
  590. seedBroker := NewMockBroker(t, 1)
  591. defer seedBroker.Close()
  592. seedBroker.SetHandlerByMap(map[string]MockResponse{
  593. "MetadataRequest": NewMockMetadataResponse(t).
  594. SetController(seedBroker.BrokerID()).
  595. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  596. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  597. })
  598. config := NewConfig()
  599. config.Version = V1_0_0_0
  600. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  605. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  606. err = admin.CreateACL(r, a)
  607. if err != nil {
  608. t.Fatal(err)
  609. }
  610. err = admin.Close()
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. }
  615. func TestClusterAdminListAcls(t *testing.T) {
  616. seedBroker := NewMockBroker(t, 1)
  617. defer seedBroker.Close()
  618. seedBroker.SetHandlerByMap(map[string]MockResponse{
  619. "MetadataRequest": NewMockMetadataResponse(t).
  620. SetController(seedBroker.BrokerID()).
  621. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  622. "DescribeAclsRequest": NewMockListAclsResponse(t),
  623. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  624. })
  625. config := NewConfig()
  626. config.Version = V1_0_0_0
  627. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  628. if err != nil {
  629. t.Fatal(err)
  630. }
  631. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  632. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  633. err = admin.CreateACL(r, a)
  634. if err != nil {
  635. t.Fatal(err)
  636. }
  637. resourceName := "my_topic"
  638. filter := AclFilter{
  639. ResourceType: AclResourceTopic,
  640. Operation: AclOperationRead,
  641. ResourceName: &resourceName,
  642. }
  643. rAcls, err := admin.ListAcls(filter)
  644. if err != nil {
  645. t.Fatal(err)
  646. }
  647. if len(rAcls) <= 0 {
  648. t.Fatal("no acls present")
  649. }
  650. err = admin.Close()
  651. if err != nil {
  652. t.Fatal(err)
  653. }
  654. }
  655. func TestClusterAdminDeleteAcl(t *testing.T) {
  656. seedBroker := NewMockBroker(t, 1)
  657. defer seedBroker.Close()
  658. seedBroker.SetHandlerByMap(map[string]MockResponse{
  659. "MetadataRequest": NewMockMetadataResponse(t).
  660. SetController(seedBroker.BrokerID()).
  661. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  662. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  663. })
  664. config := NewConfig()
  665. config.Version = V1_0_0_0
  666. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. resourceName := "my_topic"
  671. filter := AclFilter{
  672. ResourceType: AclResourceTopic,
  673. Operation: AclOperationAlter,
  674. ResourceName: &resourceName,
  675. }
  676. _, err = admin.DeleteACL(filter, false)
  677. if err != nil {
  678. t.Fatal(err)
  679. }
  680. err = admin.Close()
  681. if err != nil {
  682. t.Fatal(err)
  683. }
  684. }
  685. func TestDescribeTopic(t *testing.T) {
  686. seedBroker := NewMockBroker(t, 1)
  687. defer seedBroker.Close()
  688. seedBroker.SetHandlerByMap(map[string]MockResponse{
  689. "MetadataRequest": NewMockMetadataResponse(t).
  690. SetController(seedBroker.BrokerID()).
  691. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  692. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  693. })
  694. config := NewConfig()
  695. config.Version = V1_0_0_0
  696. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  697. if err != nil {
  698. t.Fatal(err)
  699. }
  700. topics, err := admin.DescribeTopics([]string{"my_topic"})
  701. if err != nil {
  702. t.Fatal(err)
  703. }
  704. if len(topics) != 1 {
  705. t.Fatalf("Expected 1 result, got %v", len(topics))
  706. }
  707. if topics[0].Name != "my_topic" {
  708. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  709. }
  710. err = admin.Close()
  711. if err != nil {
  712. t.Fatal(err)
  713. }
  714. }
  715. func TestDescribeTopicWithVersion0_11(t *testing.T) {
  716. seedBroker := NewMockBroker(t, 1)
  717. defer seedBroker.Close()
  718. seedBroker.SetHandlerByMap(map[string]MockResponse{
  719. "MetadataRequest": NewMockMetadataResponse(t).
  720. SetController(seedBroker.BrokerID()).
  721. SetLeader("my_topic", 0, seedBroker.BrokerID()).
  722. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  723. })
  724. config := NewConfig()
  725. config.Version = V0_11_0_0
  726. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  727. if err != nil {
  728. t.Fatal(err)
  729. }
  730. topics, err := admin.DescribeTopics([]string{"my_topic"})
  731. if err != nil {
  732. t.Fatal(err)
  733. }
  734. if len(topics) != 1 {
  735. t.Fatalf("Expected 1 result, got %v", len(topics))
  736. }
  737. if topics[0].Name != "my_topic" {
  738. t.Fatalf("Incorrect topic name: %v", topics[0].Name)
  739. }
  740. err = admin.Close()
  741. if err != nil {
  742. t.Fatal(err)
  743. }
  744. }
  745. func TestDescribeConsumerGroup(t *testing.T) {
  746. seedBroker := NewMockBroker(t, 1)
  747. defer seedBroker.Close()
  748. expectedGroupID := "my-group"
  749. seedBroker.SetHandlerByMap(map[string]MockResponse{
  750. "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
  751. GroupId: expectedGroupID,
  752. }),
  753. "MetadataRequest": NewMockMetadataResponse(t).
  754. SetController(seedBroker.BrokerID()).
  755. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  756. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
  757. })
  758. config := NewConfig()
  759. config.Version = V1_0_0_0
  760. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  761. if err != nil {
  762. t.Fatal(err)
  763. }
  764. result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. if len(result) != 1 {
  769. t.Fatalf("Expected 1 result, got %v", len(result))
  770. }
  771. if result[0].GroupId != expectedGroupID {
  772. t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
  773. }
  774. err = admin.Close()
  775. if err != nil {
  776. t.Fatal(err)
  777. }
  778. }
  779. func TestListConsumerGroups(t *testing.T) {
  780. seedBroker := NewMockBroker(t, 1)
  781. defer seedBroker.Close()
  782. seedBroker.SetHandlerByMap(map[string]MockResponse{
  783. "MetadataRequest": NewMockMetadataResponse(t).
  784. SetController(seedBroker.BrokerID()).
  785. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  786. "ListGroupsRequest": NewMockListGroupsResponse(t).
  787. AddGroup("my-group", "consumer"),
  788. })
  789. config := NewConfig()
  790. config.Version = V1_0_0_0
  791. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  792. if err != nil {
  793. t.Fatal(err)
  794. }
  795. groups, err := admin.ListConsumerGroups()
  796. if err != nil {
  797. t.Fatal(err)
  798. }
  799. if len(groups) != 1 {
  800. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  801. }
  802. protocolType, ok := groups["my-group"]
  803. if !ok {
  804. t.Fatal("Expected group to be returned, but it did not")
  805. }
  806. if protocolType != "consumer" {
  807. t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
  808. }
  809. err = admin.Close()
  810. if err != nil {
  811. t.Fatal(err)
  812. }
  813. }
  814. func TestListConsumerGroupsMultiBroker(t *testing.T) {
  815. seedBroker := NewMockBroker(t, 1)
  816. defer seedBroker.Close()
  817. secondBroker := NewMockBroker(t, 2)
  818. defer secondBroker.Close()
  819. firstGroup := "first"
  820. secondGroup := "second"
  821. nonExistingGroup := "non-existing-group"
  822. seedBroker.SetHandlerByMap(map[string]MockResponse{
  823. "MetadataRequest": NewMockMetadataResponse(t).
  824. SetController(seedBroker.BrokerID()).
  825. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  826. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  827. "ListGroupsRequest": NewMockListGroupsResponse(t).
  828. AddGroup(firstGroup, "consumer"),
  829. })
  830. secondBroker.SetHandlerByMap(map[string]MockResponse{
  831. "MetadataRequest": NewMockMetadataResponse(t).
  832. SetController(seedBroker.BrokerID()).
  833. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  834. SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
  835. "ListGroupsRequest": NewMockListGroupsResponse(t).
  836. AddGroup(secondGroup, "consumer"),
  837. })
  838. config := NewConfig()
  839. config.Version = V1_0_0_0
  840. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  841. if err != nil {
  842. t.Fatal(err)
  843. }
  844. groups, err := admin.ListConsumerGroups()
  845. if err != nil {
  846. t.Fatal(err)
  847. }
  848. if len(groups) != 2 {
  849. t.Fatalf("Expected %v results, got %v", 1, len(groups))
  850. }
  851. if _, found := groups[firstGroup]; !found {
  852. t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
  853. }
  854. if _, found := groups[secondGroup]; !found {
  855. t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
  856. }
  857. if _, found := groups[nonExistingGroup]; found {
  858. t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
  859. }
  860. err = admin.Close()
  861. if err != nil {
  862. t.Fatal(err)
  863. }
  864. }
  865. func TestListConsumerGroupOffsets(t *testing.T) {
  866. seedBroker := NewMockBroker(t, 1)
  867. defer seedBroker.Close()
  868. group := "my-group"
  869. topic := "my-topic"
  870. partition := int32(0)
  871. expectedOffset := int64(0)
  872. seedBroker.SetHandlerByMap(map[string]MockResponse{
  873. "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
  874. "MetadataRequest": NewMockMetadataResponse(t).
  875. SetController(seedBroker.BrokerID()).
  876. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  877. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  878. })
  879. config := NewConfig()
  880. config.Version = V1_0_0_0
  881. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  882. if err != nil {
  883. t.Fatal(err)
  884. }
  885. response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
  886. topic: []int32{0},
  887. })
  888. if err != nil {
  889. t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
  890. }
  891. block := response.GetBlock(topic, partition)
  892. if block == nil {
  893. t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
  894. }
  895. if block.Offset != expectedOffset {
  896. t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
  897. }
  898. err = admin.Close()
  899. if err != nil {
  900. t.Fatal(err)
  901. }
  902. }
  903. func TestDeleteConsumerGroup(t *testing.T) {
  904. seedBroker := NewMockBroker(t, 1)
  905. defer seedBroker.Close()
  906. group := "my-group"
  907. seedBroker.SetHandlerByMap(map[string]MockResponse{
  908. // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
  909. "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
  910. "MetadataRequest": NewMockMetadataResponse(t).
  911. SetController(seedBroker.BrokerID()).
  912. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  913. "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
  914. })
  915. config := NewConfig()
  916. config.Version = V1_1_0_0
  917. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  918. if err != nil {
  919. t.Fatal(err)
  920. }
  921. err = admin.DeleteConsumerGroup(group)
  922. if err != nil {
  923. t.Fatalf("DeleteConsumerGroup failed with error %v", err)
  924. }
  925. }