admin_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. package sarama
  2. import (
  3. "errors"
  4. "testing"
  5. )
  6. func TestClusterAdmin(t *testing.T) {
  7. seedBroker := NewMockBroker(t, 1)
  8. defer seedBroker.Close()
  9. seedBroker.SetHandlerByMap(map[string]MockResponse{
  10. "MetadataRequest": NewMockMetadataResponse(t).
  11. SetController(seedBroker.BrokerID()).
  12. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  13. })
  14. config := NewConfig()
  15. config.Version = V1_0_0_0
  16. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. err = admin.Close()
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. }
  25. func TestClusterAdminInvalidController(t *testing.T) {
  26. seedBroker := NewMockBroker(t, 1)
  27. defer seedBroker.Close()
  28. seedBroker.SetHandlerByMap(map[string]MockResponse{
  29. "MetadataRequest": NewMockMetadataResponse(t).
  30. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  31. })
  32. config := NewConfig()
  33. config.Version = V1_0_0_0
  34. _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  35. if err == nil {
  36. t.Fatal(errors.New("controller not set still cluster admin was created"))
  37. }
  38. if err != ErrControllerNotAvailable {
  39. t.Fatal(err)
  40. }
  41. }
  42. func TestClusterAdminCreateTopic(t *testing.T) {
  43. seedBroker := NewMockBroker(t, 1)
  44. defer seedBroker.Close()
  45. seedBroker.SetHandlerByMap(map[string]MockResponse{
  46. "MetadataRequest": NewMockMetadataResponse(t).
  47. SetController(seedBroker.BrokerID()).
  48. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  49. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  50. })
  51. config := NewConfig()
  52. config.Version = V0_10_2_0
  53. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. err = admin.Close()
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. }
  66. func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
  67. seedBroker := NewMockBroker(t, 1)
  68. defer seedBroker.Close()
  69. seedBroker.SetHandlerByMap(map[string]MockResponse{
  70. "MetadataRequest": NewMockMetadataResponse(t).
  71. SetController(seedBroker.BrokerID()).
  72. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  73. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  74. })
  75. config := NewConfig()
  76. config.Version = V0_10_2_0
  77. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. err = admin.CreateTopic("my_topic", nil, false)
  82. if err.Error() != "You must specify topic details" {
  83. t.Fatal(err)
  84. }
  85. err = admin.Close()
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. }
  90. func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
  91. seedBroker := NewMockBroker(t, 1)
  92. defer seedBroker.Close()
  93. seedBroker.SetHandlerByMap(map[string]MockResponse{
  94. "MetadataRequest": NewMockMetadataResponse(t).
  95. SetController(seedBroker.BrokerID()).
  96. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  97. "CreateTopicsRequest": NewMockCreateTopicsResponse(t),
  98. })
  99. config := NewConfig()
  100. config.Version = V0_11_0_0
  101. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
  106. if err != ErrInsufficientData {
  107. t.Fatal(err)
  108. }
  109. err = admin.Close()
  110. if err != nil {
  111. t.Fatal(err)
  112. }
  113. }
  114. func TestClusterAdminDeleteTopic(t *testing.T) {
  115. seedBroker := NewMockBroker(t, 1)
  116. defer seedBroker.Close()
  117. seedBroker.SetHandlerByMap(map[string]MockResponse{
  118. "MetadataRequest": NewMockMetadataResponse(t).
  119. SetController(seedBroker.BrokerID()).
  120. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  121. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  122. })
  123. config := NewConfig()
  124. config.Version = V0_10_2_0
  125. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. err = admin.DeleteTopic("my_topic")
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. err = admin.Close()
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. }
  138. func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
  139. seedBroker := NewMockBroker(t, 1)
  140. defer seedBroker.Close()
  141. seedBroker.SetHandlerByMap(map[string]MockResponse{
  142. "MetadataRequest": NewMockMetadataResponse(t).
  143. SetController(seedBroker.BrokerID()).
  144. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  145. "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
  146. })
  147. config := NewConfig()
  148. config.Version = V0_10_2_0
  149. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. err = admin.DeleteTopic("")
  154. if err != ErrInvalidTopic {
  155. t.Fatal(err)
  156. }
  157. err = admin.Close()
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. }
  162. func TestClusterAdminCreatePartitions(t *testing.T) {
  163. seedBroker := NewMockBroker(t, 1)
  164. defer seedBroker.Close()
  165. seedBroker.SetHandlerByMap(map[string]MockResponse{
  166. "MetadataRequest": NewMockMetadataResponse(t).
  167. SetController(seedBroker.BrokerID()).
  168. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  169. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  170. })
  171. config := NewConfig()
  172. config.Version = V1_0_0_0
  173. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. err = admin.CreatePartitions("my_topic", 3, nil, false)
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. err = admin.Close()
  182. if err != nil {
  183. t.Fatal(err)
  184. }
  185. }
  186. func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
  187. seedBroker := NewMockBroker(t, 1)
  188. defer seedBroker.Close()
  189. seedBroker.SetHandlerByMap(map[string]MockResponse{
  190. "MetadataRequest": NewMockMetadataResponse(t).
  191. SetController(seedBroker.BrokerID()).
  192. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  193. "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
  194. })
  195. config := NewConfig()
  196. config.Version = V0_10_2_0
  197. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. err = admin.CreatePartitions("my_topic", 3, nil, false)
  202. if err != ErrUnsupportedVersion {
  203. t.Fatal(err)
  204. }
  205. err = admin.Close()
  206. if err != nil {
  207. t.Fatal(err)
  208. }
  209. }
  210. func TestClusterAdminDeleteRecords(t *testing.T) {
  211. seedBroker := NewMockBroker(t, 1)
  212. defer seedBroker.Close()
  213. seedBroker.SetHandlerByMap(map[string]MockResponse{
  214. "MetadataRequest": NewMockMetadataResponse(t).
  215. SetController(seedBroker.BrokerID()).
  216. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  217. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  218. })
  219. config := NewConfig()
  220. config.Version = V1_0_0_0
  221. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  222. if err != nil {
  223. t.Fatal(err)
  224. }
  225. partitionOffset := make(map[int32]int64)
  226. partitionOffset[1] = 1000
  227. partitionOffset[2] = 1000
  228. partitionOffset[3] = 1000
  229. err = admin.DeleteRecords("my_topic", partitionOffset)
  230. if err != nil {
  231. t.Fatal(err)
  232. }
  233. err = admin.Close()
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. }
  238. func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
  239. seedBroker := NewMockBroker(t, 1)
  240. defer seedBroker.Close()
  241. seedBroker.SetHandlerByMap(map[string]MockResponse{
  242. "MetadataRequest": NewMockMetadataResponse(t).
  243. SetController(seedBroker.BrokerID()).
  244. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  245. "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
  246. })
  247. config := NewConfig()
  248. config.Version = V0_10_2_0
  249. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  250. if err != nil {
  251. t.Fatal(err)
  252. }
  253. partitionOffset := make(map[int32]int64)
  254. partitionOffset[1] = 1000
  255. partitionOffset[2] = 1000
  256. partitionOffset[3] = 1000
  257. err = admin.DeleteRecords("my_topic", partitionOffset)
  258. if err != ErrUnsupportedVersion {
  259. t.Fatal(err)
  260. }
  261. err = admin.Close()
  262. if err != nil {
  263. t.Fatal(err)
  264. }
  265. }
  266. func TestClusterAdminDescribeConfig(t *testing.T) {
  267. seedBroker := NewMockBroker(t, 1)
  268. defer seedBroker.Close()
  269. seedBroker.SetHandlerByMap(map[string]MockResponse{
  270. "MetadataRequest": NewMockMetadataResponse(t).
  271. SetController(seedBroker.BrokerID()).
  272. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  273. "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
  274. })
  275. config := NewConfig()
  276. config.Version = V1_0_0_0
  277. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  278. if err != nil {
  279. t.Fatal(err)
  280. }
  281. resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
  282. entries, err := admin.DescribeConfig(resource)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. if len(entries) <= 0 {
  287. t.Fatal(errors.New("no resource present"))
  288. }
  289. err = admin.Close()
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. }
  294. func TestClusterAdminAlterConfig(t *testing.T) {
  295. seedBroker := NewMockBroker(t, 1)
  296. defer seedBroker.Close()
  297. seedBroker.SetHandlerByMap(map[string]MockResponse{
  298. "MetadataRequest": NewMockMetadataResponse(t).
  299. SetController(seedBroker.BrokerID()).
  300. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  301. "AlterConfigsRequest": NewMockAlterConfigsResponse(t),
  302. })
  303. config := NewConfig()
  304. config.Version = V1_0_0_0
  305. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. var value string
  310. entries := make(map[string]*string)
  311. value = "3"
  312. entries["ReplicationFactor"] = &value
  313. err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. err = admin.Close()
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. }
  322. func TestClusterAdminCreateAcl(t *testing.T) {
  323. seedBroker := NewMockBroker(t, 1)
  324. defer seedBroker.Close()
  325. seedBroker.SetHandlerByMap(map[string]MockResponse{
  326. "MetadataRequest": NewMockMetadataResponse(t).
  327. SetController(seedBroker.BrokerID()).
  328. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  329. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  330. })
  331. config := NewConfig()
  332. config.Version = V1_0_0_0
  333. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  338. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  339. err = admin.CreateACL(r, a)
  340. if err != nil {
  341. t.Fatal(err)
  342. }
  343. err = admin.Close()
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. }
  348. func TestClusterAdminListAcls(t *testing.T) {
  349. seedBroker := NewMockBroker(t, 1)
  350. defer seedBroker.Close()
  351. seedBroker.SetHandlerByMap(map[string]MockResponse{
  352. "MetadataRequest": NewMockMetadataResponse(t).
  353. SetController(seedBroker.BrokerID()).
  354. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  355. "DescribeAclsRequest": NewMockListAclsResponse(t),
  356. "CreateAclsRequest": NewMockCreateAclsResponse(t),
  357. })
  358. config := NewConfig()
  359. config.Version = V1_0_0_0
  360. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
  365. a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
  366. err = admin.CreateACL(r, a)
  367. if err != nil {
  368. t.Fatal(err)
  369. }
  370. resourceName := "my_topic"
  371. filter := AclFilter{
  372. ResourceType: AclResourceTopic,
  373. Operation: AclOperationRead,
  374. ResourceName: &resourceName,
  375. }
  376. rAcls, err := admin.ListAcls(filter)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. if len(rAcls) <= 0 {
  381. t.Fatal("no acls present")
  382. }
  383. err = admin.Close()
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. }
  388. func TestClusterAdminDeleteAcl(t *testing.T) {
  389. seedBroker := NewMockBroker(t, 1)
  390. defer seedBroker.Close()
  391. seedBroker.SetHandlerByMap(map[string]MockResponse{
  392. "MetadataRequest": NewMockMetadataResponse(t).
  393. SetController(seedBroker.BrokerID()).
  394. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  395. "DeleteAclsRequest": NewMockDeleteAclsResponse(t),
  396. })
  397. config := NewConfig()
  398. config.Version = V1_0_0_0
  399. admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. resourceName := "my_topic"
  404. filter := AclFilter{
  405. ResourceType: AclResourceTopic,
  406. Operation: AclOperationAlter,
  407. ResourceName: &resourceName,
  408. }
  409. _, err = admin.DeleteACL(filter, false)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. err = admin.Close()
  414. if err != nil {
  415. t.Fatal(err)
  416. }
  417. }