balance_strategy_test.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365
  1. package sarama
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "reflect"
  8. "sort"
  9. "testing"
  10. "time"
  11. )
  12. func TestBalanceStrategyRange(t *testing.T) {
  13. tests := []struct {
  14. members map[string][]string
  15. topics map[string][]int32
  16. expected BalanceStrategyPlan
  17. }{
  18. {
  19. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  20. topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
  21. expected: BalanceStrategyPlan{
  22. "M1": map[string][]int32{"T1": {0, 1}, "T2": {2, 3}},
  23. "M2": map[string][]int32{"T1": {2, 3}, "T2": {0, 1}},
  24. },
  25. },
  26. {
  27. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  28. topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
  29. expected: BalanceStrategyPlan{
  30. "M1": map[string][]int32{"T1": {0, 1}, "T2": {2}},
  31. "M2": map[string][]int32{"T1": {2}, "T2": {0, 1}},
  32. },
  33. },
  34. {
  35. members: map[string][]string{"M1": {"T1"}, "M2": {"T1", "T2"}},
  36. topics: map[string][]int32{"T1": {0, 1}, "T2": {0, 1}},
  37. expected: BalanceStrategyPlan{
  38. "M1": map[string][]int32{"T1": {0}},
  39. "M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
  40. },
  41. },
  42. }
  43. strategy := BalanceStrategyRange
  44. if strategy.Name() != "range" {
  45. t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
  46. }
  47. for _, test := range tests {
  48. members := make(map[string]ConsumerGroupMemberMetadata)
  49. for memberID, topics := range test.members {
  50. members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
  51. }
  52. actual, err := strategy.Plan(members, test.topics)
  53. if err != nil {
  54. t.Errorf("Unexpected error %v", err)
  55. } else if !reflect.DeepEqual(actual, test.expected) {
  56. t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
  57. }
  58. }
  59. }
  60. func TestBalanceStrategyRangeAssignmentData(t *testing.T) {
  61. strategy := BalanceStrategyRange
  62. members := make(map[string]ConsumerGroupMemberMetadata, 2)
  63. members["consumer1"] = ConsumerGroupMemberMetadata{
  64. Topics: []string{"topic1"},
  65. }
  66. members["consumer2"] = ConsumerGroupMemberMetadata{
  67. Topics: []string{"topic1"},
  68. }
  69. actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
  70. if err != nil {
  71. t.Errorf("Error building assignment data: %v", err)
  72. }
  73. if actual != nil {
  74. t.Error("Invalid assignment data returned from AssignmentData")
  75. }
  76. }
  77. func TestBalanceStrategyRoundRobin(t *testing.T) {
  78. tests := []struct {
  79. members map[string][]string
  80. topics map[string][]int32
  81. expected BalanceStrategyPlan
  82. }{
  83. {
  84. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  85. topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
  86. expected: BalanceStrategyPlan{
  87. "M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
  88. "M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
  89. },
  90. },
  91. {
  92. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  93. topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
  94. expected: BalanceStrategyPlan{
  95. "M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
  96. "M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
  97. },
  98. },
  99. }
  100. strategy := BalanceStrategyRoundRobin
  101. if strategy.Name() != "roundrobin" {
  102. t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
  103. }
  104. for _, test := range tests {
  105. members := make(map[string]ConsumerGroupMemberMetadata)
  106. for memberID, topics := range test.members {
  107. members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
  108. }
  109. actual, err := strategy.Plan(members, test.topics)
  110. if err != nil {
  111. t.Errorf("Unexpected error %v", err)
  112. } else if !reflect.DeepEqual(actual, test.expected) {
  113. t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
  114. }
  115. }
  116. }
  117. func Test_deserializeTopicPartitionAssignment(t *testing.T) {
  118. type args struct {
  119. userDataBytes []byte
  120. }
  121. tests := []struct {
  122. name string
  123. args args
  124. want StickyAssignorUserData
  125. wantErr bool
  126. }{
  127. {
  128. name: "Nil userdata bytes",
  129. args: args{},
  130. want: &StickyAssignorUserDataV1{},
  131. },
  132. {
  133. name: "Non-empty invalid userdata bytes",
  134. args: args{
  135. userDataBytes: []byte{
  136. 0x00, 0x00,
  137. 0x00, 0x00, 0x00, 0x01,
  138. 0x00, 0x03, 'f', 'o', 'o',
  139. },
  140. },
  141. wantErr: true,
  142. },
  143. {
  144. name: "Valid v0 userdata bytes",
  145. args: args{
  146. userDataBytes: []byte{
  147. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  148. 0x33, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
  149. 0x05,
  150. },
  151. },
  152. want: &StickyAssignorUserDataV0{
  153. Topics: map[string][]int32{"t03": {5}},
  154. topicPartitions: []topicPartitionAssignment{
  155. {
  156. Topic: "t03",
  157. Partition: 5,
  158. },
  159. },
  160. },
  161. },
  162. {
  163. name: "Valid v1 userdata bytes",
  164. args: args{
  165. userDataBytes: []byte{
  166. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  167. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  168. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  169. 0xff,
  170. },
  171. },
  172. want: &StickyAssignorUserDataV1{
  173. Topics: map[string][]int32{"t06": {0, 4}},
  174. Generation: -1,
  175. topicPartitions: []topicPartitionAssignment{
  176. {
  177. Topic: "t06",
  178. Partition: 0,
  179. },
  180. {
  181. Topic: "t06",
  182. Partition: 4,
  183. },
  184. },
  185. },
  186. },
  187. }
  188. for _, tt := range tests {
  189. t.Run(tt.name, func(t *testing.T) {
  190. got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
  191. if (err != nil) != tt.wantErr {
  192. t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
  193. return
  194. }
  195. if !reflect.DeepEqual(got, tt.want) {
  196. t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
  197. }
  198. })
  199. }
  200. }
  201. func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
  202. strategy := BalanceStrategyRoundRobin
  203. members := make(map[string]ConsumerGroupMemberMetadata, 2)
  204. members["consumer1"] = ConsumerGroupMemberMetadata{
  205. Topics: []string{"topic1"},
  206. }
  207. members["consumer2"] = ConsumerGroupMemberMetadata{
  208. Topics: []string{"topic1"},
  209. }
  210. actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
  211. if err != nil {
  212. t.Errorf("Error building assignment data: %v", err)
  213. }
  214. if actual != nil {
  215. t.Error("Invalid assignment data returned from AssignmentData")
  216. }
  217. }
  218. func Test_prepopulateCurrentAssignments(t *testing.T) {
  219. type args struct {
  220. members map[string]ConsumerGroupMemberMetadata
  221. }
  222. tests := []struct {
  223. name string
  224. args args
  225. wantCurrentAssignments map[string][]topicPartitionAssignment
  226. wantPrevAssignments map[topicPartitionAssignment]consumerGenerationPair
  227. wantErr bool
  228. }{
  229. {
  230. name: "Empty map",
  231. wantCurrentAssignments: map[string][]topicPartitionAssignment{},
  232. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  233. },
  234. {
  235. name: "Single consumer",
  236. args: args{
  237. members: map[string]ConsumerGroupMemberMetadata{
  238. "c01": {
  239. Version: 2,
  240. UserData: []byte{
  241. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  242. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  243. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  244. 0xff,
  245. },
  246. },
  247. },
  248. },
  249. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  250. "c01": {
  251. {
  252. Topic: "t06",
  253. Partition: 0,
  254. },
  255. {
  256. Topic: "t06",
  257. Partition: 4,
  258. },
  259. },
  260. },
  261. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  262. },
  263. {
  264. name: "Duplicate consumer assignments in metadata",
  265. args: args{
  266. members: map[string]ConsumerGroupMemberMetadata{
  267. "c01": {
  268. Version: 2,
  269. UserData: []byte{
  270. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  271. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  272. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  273. 0xff,
  274. },
  275. },
  276. "c02": {
  277. Version: 2,
  278. UserData: []byte{
  279. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  280. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  281. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  282. 0xff,
  283. },
  284. },
  285. },
  286. },
  287. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  288. "c01": {
  289. {
  290. Topic: "t06",
  291. Partition: 0,
  292. },
  293. {
  294. Topic: "t06",
  295. Partition: 4,
  296. },
  297. },
  298. },
  299. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  300. },
  301. {
  302. name: "Different generations (5, 6) of consumer assignments in metadata",
  303. args: args{
  304. members: map[string]ConsumerGroupMemberMetadata{
  305. "c01": {
  306. Version: 2,
  307. UserData: []byte{
  308. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  309. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  310. 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
  311. 0x05,
  312. },
  313. },
  314. "c02": {
  315. Version: 2,
  316. UserData: []byte{
  317. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  318. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  319. 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
  320. 0x06,
  321. },
  322. },
  323. },
  324. },
  325. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  326. "c01": {
  327. {
  328. Topic: "t06",
  329. Partition: 0,
  330. },
  331. {
  332. Topic: "t06",
  333. Partition: 4,
  334. },
  335. },
  336. },
  337. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{
  338. {
  339. Topic: "t06",
  340. Partition: 0,
  341. }: {
  342. Generation: 5,
  343. MemberID: "c01",
  344. },
  345. {
  346. Topic: "t06",
  347. Partition: 4,
  348. }: {
  349. Generation: 5,
  350. MemberID: "c01",
  351. },
  352. },
  353. },
  354. }
  355. for _, tt := range tests {
  356. t.Run(tt.name, func(t *testing.T) {
  357. _, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)
  358. if (err != nil) != tt.wantErr {
  359. t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
  360. }
  361. if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
  362. t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
  363. }
  364. })
  365. }
  366. }
  367. func Test_areSubscriptionsIdentical(t *testing.T) {
  368. type args struct {
  369. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  370. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  371. }
  372. tests := []struct {
  373. name string
  374. args args
  375. want bool
  376. }{
  377. {
  378. name: "Empty consumers and partitions",
  379. args: args{
  380. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  381. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  382. },
  383. want: true,
  384. },
  385. {
  386. name: "Topic partitions with identical consumer entries",
  387. args: args{
  388. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  389. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  390. {Topic: "t1", Partition: 1}: {"c1", "c2", "c3"},
  391. {Topic: "t1", Partition: 2}: {"c1", "c2", "c3"},
  392. },
  393. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  394. },
  395. want: true,
  396. },
  397. {
  398. name: "Topic partitions with mixed up consumer entries",
  399. args: args{
  400. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  401. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  402. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  403. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  404. },
  405. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  406. },
  407. want: true,
  408. },
  409. {
  410. name: "Topic partitions with different consumer entries",
  411. args: args{
  412. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  413. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  414. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  415. {Topic: "t1", Partition: 2}: {"cX", "c1", "c2"},
  416. },
  417. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  418. },
  419. want: false,
  420. },
  421. {
  422. name: "Topic partitions with different number of consumer entries",
  423. args: args{
  424. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  425. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  426. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  427. {Topic: "t1", Partition: 2}: {"c1", "c2"},
  428. },
  429. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  430. },
  431. want: false,
  432. },
  433. {
  434. name: "Consumers with identical topic partitions",
  435. args: args{
  436. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  437. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  438. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  439. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  440. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  441. },
  442. },
  443. want: true,
  444. },
  445. {
  446. name: "Consumer2 with mixed up consumer entries",
  447. args: args{
  448. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  449. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  450. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  451. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  452. "c3": {{Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  453. },
  454. },
  455. want: true,
  456. },
  457. {
  458. name: "Consumer2 with different consumer entries",
  459. args: args{
  460. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  461. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  462. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  463. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  464. "c3": {{Topic: "tX", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  465. },
  466. },
  467. want: false,
  468. },
  469. {
  470. name: "Consumer2 with different number of consumer entries",
  471. args: args{
  472. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  473. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  474. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  475. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  476. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  477. },
  478. },
  479. want: false,
  480. },
  481. }
  482. for _, tt := range tests {
  483. t.Run(tt.name, func(t *testing.T) {
  484. if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want {
  485. t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want)
  486. }
  487. })
  488. }
  489. }
  490. func Test_sortMemberIDsByPartitionAssignments(t *testing.T) {
  491. type args struct {
  492. assignments map[string][]topicPartitionAssignment
  493. }
  494. tests := []struct {
  495. name string
  496. args args
  497. want []string
  498. }{
  499. {
  500. name: "Null assignments",
  501. want: make([]string, 0),
  502. },
  503. {
  504. name: "Single assignment",
  505. args: args{
  506. assignments: map[string][]topicPartitionAssignment{
  507. "c1": {
  508. {Topic: "t1", Partition: 0},
  509. {Topic: "t1", Partition: 1},
  510. {Topic: "t1", Partition: 2},
  511. },
  512. },
  513. },
  514. want: []string{"c1"},
  515. },
  516. {
  517. name: "Multiple assignments with different partition counts",
  518. args: args{
  519. assignments: map[string][]topicPartitionAssignment{
  520. "c1": {
  521. {Topic: "t1", Partition: 0},
  522. },
  523. "c2": {
  524. {Topic: "t1", Partition: 1},
  525. {Topic: "t1", Partition: 2},
  526. },
  527. "c3": {
  528. {Topic: "t1", Partition: 3},
  529. {Topic: "t1", Partition: 4},
  530. {Topic: "t1", Partition: 5},
  531. },
  532. },
  533. },
  534. want: []string{"c1", "c2", "c3"},
  535. },
  536. }
  537. for _, tt := range tests {
  538. t.Run(tt.name, func(t *testing.T) {
  539. if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) {
  540. t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want)
  541. }
  542. })
  543. }
  544. }
  545. func Test_sortPartitions(t *testing.T) {
  546. type args struct {
  547. currentAssignment map[string][]topicPartitionAssignment
  548. partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair
  549. isFreshAssignment bool
  550. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  551. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  552. }
  553. tests := []struct {
  554. name string
  555. args args
  556. want []topicPartitionAssignment
  557. }{
  558. {
  559. name: "Empty everything",
  560. want: make([]topicPartitionAssignment, 0),
  561. },
  562. {
  563. name: "Base case",
  564. args: args{
  565. currentAssignment: map[string][]topicPartitionAssignment{
  566. "c1": {{Topic: "t1", Partition: 0}},
  567. "c2": {{Topic: "t1", Partition: 1}},
  568. "c3": {{Topic: "t1", Partition: 2}},
  569. },
  570. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  571. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  572. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  573. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  574. },
  575. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  576. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  577. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  578. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  579. },
  580. },
  581. },
  582. {
  583. name: "Partitions assigned to a different consumer last time",
  584. args: args{
  585. currentAssignment: map[string][]topicPartitionAssignment{
  586. "c1": {{Topic: "t1", Partition: 0}},
  587. },
  588. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  589. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  590. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  591. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  592. },
  593. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  594. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  595. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  596. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  597. },
  598. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  599. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  600. },
  601. },
  602. },
  603. {
  604. name: "Partitions assigned to a different consumer last time",
  605. args: args{
  606. currentAssignment: map[string][]topicPartitionAssignment{
  607. "c1": {{Topic: "t1", Partition: 0}},
  608. "c2": {{Topic: "t1", Partition: 1}},
  609. },
  610. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  611. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  612. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  613. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  614. },
  615. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  616. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  617. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  618. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  619. },
  620. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  621. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  622. },
  623. },
  624. },
  625. {
  626. name: "Fresh assignment",
  627. args: args{
  628. isFreshAssignment: true,
  629. currentAssignment: map[string][]topicPartitionAssignment{},
  630. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  631. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  632. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  633. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  634. },
  635. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  636. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  637. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  638. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  639. },
  640. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  641. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  642. },
  643. },
  644. },
  645. }
  646. for _, tt := range tests {
  647. t.Run(tt.name, func(t *testing.T) {
  648. got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions)
  649. if tt.want != nil && !reflect.DeepEqual(got, tt.want) {
  650. t.Errorf("sortPartitions() = %v, want %v", got, tt.want)
  651. }
  652. })
  653. }
  654. }
  655. func Test_filterAssignedPartitions(t *testing.T) {
  656. type args struct {
  657. currentAssignment map[string][]topicPartitionAssignment
  658. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  659. }
  660. tests := []struct {
  661. name string
  662. args args
  663. want map[string][]topicPartitionAssignment
  664. }{
  665. {
  666. name: "All partitions accounted for",
  667. args: args{
  668. currentAssignment: map[string][]topicPartitionAssignment{
  669. "c1": {{Topic: "t1", Partition: 0}},
  670. "c2": {{Topic: "t1", Partition: 1}},
  671. },
  672. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  673. {Topic: "t1", Partition: 0}: {"c1"},
  674. {Topic: "t1", Partition: 1}: {"c2"},
  675. },
  676. },
  677. want: map[string][]topicPartitionAssignment{
  678. "c1": {{Topic: "t1", Partition: 0}},
  679. "c2": {{Topic: "t1", Partition: 1}},
  680. },
  681. },
  682. {
  683. name: "One consumer using an unrecognized partition",
  684. args: args{
  685. currentAssignment: map[string][]topicPartitionAssignment{
  686. "c1": {{Topic: "t1", Partition: 0}},
  687. "c2": {{Topic: "t1", Partition: 1}},
  688. },
  689. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  690. {Topic: "t1", Partition: 0}: {"c1"},
  691. },
  692. },
  693. want: map[string][]topicPartitionAssignment{
  694. "c1": {{Topic: "t1", Partition: 0}},
  695. "c2": {},
  696. },
  697. },
  698. {
  699. name: "Interleaved consumer removal",
  700. args: args{
  701. currentAssignment: map[string][]topicPartitionAssignment{
  702. "c1": {{Topic: "t1", Partition: 0}},
  703. "c2": {{Topic: "t1", Partition: 1}},
  704. "c3": {{Topic: "t1", Partition: 2}},
  705. },
  706. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  707. {Topic: "t1", Partition: 0}: {"c1"},
  708. {Topic: "t1", Partition: 2}: {"c3"},
  709. },
  710. },
  711. want: map[string][]topicPartitionAssignment{
  712. "c1": {{Topic: "t1", Partition: 0}},
  713. "c2": {},
  714. "c3": {{Topic: "t1", Partition: 2}},
  715. },
  716. },
  717. }
  718. for _, tt := range tests {
  719. t.Run(tt.name, func(t *testing.T) {
  720. if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
  721. t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want)
  722. }
  723. })
  724. }
  725. }
  726. func Test_canConsumerParticipateInReassignment(t *testing.T) {
  727. type args struct {
  728. memberID string
  729. currentAssignment map[string][]topicPartitionAssignment
  730. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  731. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  732. }
  733. tests := []struct {
  734. name string
  735. args args
  736. want bool
  737. }{
  738. {
  739. name: "Consumer has been assigned partitions not available to it",
  740. args: args{
  741. memberID: "c1",
  742. currentAssignment: map[string][]topicPartitionAssignment{
  743. "c1": {
  744. {Topic: "t1", Partition: 0},
  745. {Topic: "t1", Partition: 1},
  746. {Topic: "t1", Partition: 2},
  747. },
  748. "c2": {},
  749. },
  750. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  751. "c1": {
  752. {Topic: "t1", Partition: 0},
  753. {Topic: "t1", Partition: 1},
  754. },
  755. "c2": {
  756. {Topic: "t1", Partition: 0},
  757. {Topic: "t1", Partition: 1},
  758. {Topic: "t1", Partition: 2},
  759. },
  760. },
  761. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  762. {Topic: "t1", Partition: 0}: {"c1", "c2"},
  763. {Topic: "t1", Partition: 1}: {"c1", "c2"},
  764. {Topic: "t1", Partition: 2}: {"c2"},
  765. },
  766. },
  767. want: true,
  768. },
  769. {
  770. name: "Consumer has been assigned all available partitions",
  771. args: args{
  772. memberID: "c1",
  773. currentAssignment: map[string][]topicPartitionAssignment{
  774. "c1": {
  775. {Topic: "t1", Partition: 0},
  776. {Topic: "t1", Partition: 1},
  777. },
  778. },
  779. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  780. "c1": {
  781. {Topic: "t1", Partition: 0},
  782. {Topic: "t1", Partition: 1},
  783. },
  784. },
  785. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  786. {Topic: "t1", Partition: 0}: {"c1"},
  787. {Topic: "t1", Partition: 1}: {"c1"},
  788. },
  789. },
  790. want: false,
  791. },
  792. {
  793. name: "Consumer has not been assigned all available partitions",
  794. args: args{
  795. memberID: "c1",
  796. currentAssignment: map[string][]topicPartitionAssignment{
  797. "c1": {
  798. {Topic: "t1", Partition: 0},
  799. {Topic: "t1", Partition: 1},
  800. },
  801. },
  802. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  803. "c1": {
  804. {Topic: "t1", Partition: 0},
  805. {Topic: "t1", Partition: 1},
  806. {Topic: "t1", Partition: 2},
  807. },
  808. },
  809. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  810. {Topic: "t1", Partition: 0}: {"c1"},
  811. {Topic: "t1", Partition: 1}: {"c1"},
  812. {Topic: "t1", Partition: 2}: {"c1"},
  813. },
  814. },
  815. want: true,
  816. },
  817. }
  818. for _, tt := range tests {
  819. t.Run(tt.name, func(t *testing.T) {
  820. if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want {
  821. t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want)
  822. }
  823. })
  824. }
  825. }
  826. func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) {
  827. type args struct {
  828. assignments []topicPartitionAssignment
  829. topic topicPartitionAssignment
  830. }
  831. tests := []struct {
  832. name string
  833. args args
  834. want []topicPartitionAssignment
  835. }{
  836. {
  837. name: "Empty",
  838. args: args{
  839. assignments: make([]topicPartitionAssignment, 0),
  840. topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
  841. },
  842. want: make([]topicPartitionAssignment, 0),
  843. },
  844. {
  845. name: "Remove first entry",
  846. args: args{
  847. assignments: []topicPartitionAssignment{
  848. {Topic: "t1", Partition: 0},
  849. {Topic: "t1", Partition: 1},
  850. {Topic: "t1", Partition: 2},
  851. },
  852. topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
  853. },
  854. want: []topicPartitionAssignment{
  855. {Topic: "t1", Partition: 1},
  856. {Topic: "t1", Partition: 2},
  857. },
  858. },
  859. {
  860. name: "Remove middle entry",
  861. args: args{
  862. assignments: []topicPartitionAssignment{
  863. {Topic: "t1", Partition: 0},
  864. {Topic: "t1", Partition: 1},
  865. {Topic: "t1", Partition: 2},
  866. },
  867. topic: topicPartitionAssignment{Topic: "t1", Partition: 1},
  868. },
  869. want: []topicPartitionAssignment{
  870. {Topic: "t1", Partition: 0},
  871. {Topic: "t1", Partition: 2},
  872. },
  873. },
  874. {
  875. name: "Remove last entry",
  876. args: args{
  877. assignments: []topicPartitionAssignment{
  878. {Topic: "t1", Partition: 0},
  879. {Topic: "t1", Partition: 1},
  880. {Topic: "t1", Partition: 2},
  881. },
  882. topic: topicPartitionAssignment{Topic: "t1", Partition: 2},
  883. },
  884. want: []topicPartitionAssignment{
  885. {Topic: "t1", Partition: 0},
  886. {Topic: "t1", Partition: 1},
  887. },
  888. },
  889. }
  890. for _, tt := range tests {
  891. t.Run(tt.name, func(t *testing.T) {
  892. if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) {
  893. t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want)
  894. }
  895. })
  896. }
  897. }
  898. func Test_assignPartition(t *testing.T) {
  899. type args struct {
  900. partition topicPartitionAssignment
  901. sortedCurrentSubscriptions []string
  902. currentAssignment map[string][]topicPartitionAssignment
  903. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  904. currentPartitionConsumer map[topicPartitionAssignment]string
  905. }
  906. tests := []struct {
  907. name string
  908. args args
  909. want []string
  910. wantCurrentAssignment map[string][]topicPartitionAssignment
  911. wantCurrentPartitionConsumer map[topicPartitionAssignment]string
  912. }{
  913. {
  914. name: "Base",
  915. args: args{
  916. partition: topicPartitionAssignment{Topic: "t1", Partition: 2},
  917. sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
  918. currentAssignment: map[string][]topicPartitionAssignment{
  919. "c1": {
  920. {Topic: "t1", Partition: 0},
  921. },
  922. "c2": {
  923. {Topic: "t1", Partition: 1},
  924. },
  925. "c3": {},
  926. },
  927. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  928. "c1": {
  929. {Topic: "t1", Partition: 0},
  930. },
  931. "c2": {
  932. {Topic: "t1", Partition: 1},
  933. },
  934. "c3": {
  935. {Topic: "t1", Partition: 2},
  936. },
  937. },
  938. currentPartitionConsumer: map[topicPartitionAssignment]string{
  939. {Topic: "t1", Partition: 0}: "c1",
  940. {Topic: "t1", Partition: 1}: "c2",
  941. },
  942. },
  943. want: []string{"c1", "c2", "c3"},
  944. wantCurrentAssignment: map[string][]topicPartitionAssignment{
  945. "c1": {
  946. {Topic: "t1", Partition: 0},
  947. },
  948. "c2": {
  949. {Topic: "t1", Partition: 1},
  950. },
  951. "c3": {
  952. {Topic: "t1", Partition: 2},
  953. },
  954. },
  955. wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
  956. {Topic: "t1", Partition: 0}: "c1",
  957. {Topic: "t1", Partition: 1}: "c2",
  958. {Topic: "t1", Partition: 2}: "c3",
  959. },
  960. },
  961. {
  962. name: "Unassignable Partition",
  963. args: args{
  964. partition: topicPartitionAssignment{Topic: "t1", Partition: 3},
  965. sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
  966. currentAssignment: map[string][]topicPartitionAssignment{
  967. "c1": {
  968. {Topic: "t1", Partition: 0},
  969. },
  970. "c2": {
  971. {Topic: "t1", Partition: 1},
  972. },
  973. "c3": {},
  974. },
  975. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  976. "c1": {
  977. {Topic: "t1", Partition: 0},
  978. },
  979. "c2": {
  980. {Topic: "t1", Partition: 1},
  981. },
  982. "c3": {
  983. {Topic: "t1", Partition: 2},
  984. },
  985. },
  986. currentPartitionConsumer: map[topicPartitionAssignment]string{
  987. {Topic: "t1", Partition: 0}: "c1",
  988. {Topic: "t1", Partition: 1}: "c2",
  989. },
  990. },
  991. want: []string{"c3", "c1", "c2"},
  992. wantCurrentAssignment: map[string][]topicPartitionAssignment{
  993. "c1": {
  994. {Topic: "t1", Partition: 0},
  995. },
  996. "c2": {
  997. {Topic: "t1", Partition: 1},
  998. },
  999. "c3": {},
  1000. },
  1001. wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
  1002. {Topic: "t1", Partition: 0}: "c1",
  1003. {Topic: "t1", Partition: 1}: "c2",
  1004. },
  1005. },
  1006. }
  1007. for _, tt := range tests {
  1008. t.Run(tt.name, func(t *testing.T) {
  1009. if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) {
  1010. t.Errorf("assignPartition() = %v, want %v", got, tt.want)
  1011. }
  1012. if !reflect.DeepEqual(tt.args.currentAssignment, tt.wantCurrentAssignment) {
  1013. t.Errorf("assignPartition() currentAssignment = %v, want %v", tt.args.currentAssignment, tt.wantCurrentAssignment)
  1014. }
  1015. if !reflect.DeepEqual(tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer) {
  1016. t.Errorf("assignPartition() currentPartitionConsumer = %v, want %v", tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer)
  1017. }
  1018. })
  1019. }
  1020. }
  1021. func Test_stickyBalanceStrategy_Plan(t *testing.T) {
  1022. type args struct {
  1023. members map[string]ConsumerGroupMemberMetadata
  1024. topics map[string][]int32
  1025. }
  1026. tests := []struct {
  1027. name string
  1028. s *stickyBalanceStrategy
  1029. args args
  1030. }{
  1031. {
  1032. name: "One consumer with no topics",
  1033. args: args{
  1034. members: map[string]ConsumerGroupMemberMetadata{
  1035. "consumer": {},
  1036. },
  1037. topics: make(map[string][]int32),
  1038. },
  1039. },
  1040. {
  1041. name: "One consumer with non-existent topic",
  1042. args: args{
  1043. members: map[string]ConsumerGroupMemberMetadata{
  1044. "consumer": {
  1045. Topics: []string{"topic"},
  1046. },
  1047. },
  1048. topics: map[string][]int32{
  1049. "topic": make([]int32, 0),
  1050. },
  1051. },
  1052. },
  1053. {
  1054. name: "One consumer with one topic",
  1055. args: args{
  1056. members: map[string]ConsumerGroupMemberMetadata{
  1057. "consumer": {
  1058. Topics: []string{"topic"},
  1059. },
  1060. },
  1061. topics: map[string][]int32{
  1062. "topic": {0, 1, 2},
  1063. },
  1064. },
  1065. },
  1066. {
  1067. name: "Only assigns partitions from subscribed topics",
  1068. args: args{
  1069. members: map[string]ConsumerGroupMemberMetadata{
  1070. "consumer": {
  1071. Topics: []string{"topic"},
  1072. },
  1073. },
  1074. topics: map[string][]int32{
  1075. "topic": {0, 1, 2},
  1076. "other": {0, 1, 2},
  1077. },
  1078. },
  1079. },
  1080. {
  1081. name: "One consumer with multiple topics",
  1082. args: args{
  1083. members: map[string]ConsumerGroupMemberMetadata{
  1084. "consumer": {
  1085. Topics: []string{"topic1", "topic2"},
  1086. },
  1087. },
  1088. topics: map[string][]int32{
  1089. "topic1": {0},
  1090. "topic2": {0, 1},
  1091. },
  1092. },
  1093. },
  1094. {
  1095. name: "Two consumers with one topic and one partition",
  1096. args: args{
  1097. members: map[string]ConsumerGroupMemberMetadata{
  1098. "consumer1": {
  1099. Topics: []string{"topic"},
  1100. },
  1101. "consumer2": {
  1102. Topics: []string{"topic"},
  1103. },
  1104. },
  1105. topics: map[string][]int32{
  1106. "topic": {0},
  1107. },
  1108. },
  1109. },
  1110. {
  1111. name: "Two consumers with one topic and two partitions",
  1112. args: args{
  1113. members: map[string]ConsumerGroupMemberMetadata{
  1114. "consumer1": {
  1115. Topics: []string{"topic"},
  1116. },
  1117. "consumer2": {
  1118. Topics: []string{"topic"},
  1119. },
  1120. },
  1121. topics: map[string][]int32{
  1122. "topic": {0, 1},
  1123. },
  1124. },
  1125. },
  1126. {
  1127. name: "Multiple consumers with mixed topic subscriptions",
  1128. args: args{
  1129. members: map[string]ConsumerGroupMemberMetadata{
  1130. "consumer1": {
  1131. Topics: []string{"topic1"},
  1132. },
  1133. "consumer2": {
  1134. Topics: []string{"topic1", "topic2"},
  1135. },
  1136. "consumer3": {
  1137. Topics: []string{"topic1"},
  1138. },
  1139. },
  1140. topics: map[string][]int32{
  1141. "topic1": {0, 1, 2},
  1142. "topic2": {0, 1},
  1143. },
  1144. },
  1145. },
  1146. {
  1147. name: "Two consumers with two topics and six partitions",
  1148. args: args{
  1149. members: map[string]ConsumerGroupMemberMetadata{
  1150. "consumer1": {
  1151. Topics: []string{"topic1", "topic2"},
  1152. },
  1153. "consumer2": {
  1154. Topics: []string{"topic1", "topic2"},
  1155. },
  1156. },
  1157. topics: map[string][]int32{
  1158. "topic1": {0, 1, 2},
  1159. "topic2": {0, 1, 2},
  1160. },
  1161. },
  1162. },
  1163. {
  1164. name: "Three consumers (two old, one new) with one topic and twelve partitions",
  1165. args: args{
  1166. members: map[string]ConsumerGroupMemberMetadata{
  1167. "consumer1": {
  1168. Topics: []string{"topic1"},
  1169. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2}}, 1),
  1170. },
  1171. "consumer2": {
  1172. Topics: []string{"topic1"},
  1173. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 6}}, 1),
  1174. },
  1175. "consumer3": {
  1176. Topics: []string{"topic1"},
  1177. },
  1178. },
  1179. topics: map[string][]int32{
  1180. "topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
  1181. },
  1182. },
  1183. },
  1184. {
  1185. name: "Three consumers (two old, one new) with one topic and 13 partitions",
  1186. args: args{
  1187. members: map[string]ConsumerGroupMemberMetadata{
  1188. "consumer1": {
  1189. Topics: []string{"topic1"},
  1190. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2, 6}}, 1),
  1191. },
  1192. "consumer2": {
  1193. Topics: []string{"topic1"},
  1194. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 12}}, 1),
  1195. },
  1196. "consumer3": {
  1197. Topics: []string{"topic1"},
  1198. },
  1199. },
  1200. topics: map[string][]int32{
  1201. "topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
  1202. },
  1203. },
  1204. },
  1205. {
  1206. name: "One consumer that is no longer subscribed to a topic that it had previously been consuming from",
  1207. args: args{
  1208. members: map[string]ConsumerGroupMemberMetadata{
  1209. "consumer1": {
  1210. Topics: []string{"topic2"},
  1211. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
  1212. },
  1213. },
  1214. topics: map[string][]int32{
  1215. "topic1": {0},
  1216. "topic2": {0},
  1217. },
  1218. },
  1219. },
  1220. {
  1221. name: "Two consumers where one is no longer interested in consuming from a topic that it had been consuming from",
  1222. args: args{
  1223. members: map[string]ConsumerGroupMemberMetadata{
  1224. "consumer1": {
  1225. Topics: []string{"topic2"},
  1226. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
  1227. },
  1228. "consumer2": {
  1229. Topics: []string{"topic1", "topic2"},
  1230. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1}}, 1),
  1231. },
  1232. },
  1233. topics: map[string][]int32{
  1234. "topic1": {0, 1},
  1235. "topic2": {0, 1},
  1236. },
  1237. },
  1238. },
  1239. }
  1240. for _, tt := range tests {
  1241. t.Run(tt.name, func(t *testing.T) {
  1242. s := &stickyBalanceStrategy{}
  1243. plan, err := s.Plan(tt.args.members, tt.args.topics)
  1244. verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err)
  1245. verifyFullyBalanced(t, plan)
  1246. })
  1247. }
  1248. }
  1249. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) {
  1250. s := &stickyBalanceStrategy{}
  1251. // PLAN 1
  1252. members := map[string]ConsumerGroupMemberMetadata{
  1253. "consumer1": {
  1254. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1255. },
  1256. "consumer2": {
  1257. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1258. },
  1259. "consumer3": {
  1260. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1261. },
  1262. }
  1263. topics := map[string][]int32{
  1264. "topic1": {0, 1},
  1265. "topic2": {0, 1},
  1266. "topic3": {0, 1},
  1267. "topic4": {0, 1},
  1268. }
  1269. plan1, err := s.Plan(members, topics)
  1270. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1271. verifyFullyBalanced(t, plan1)
  1272. // PLAN 2
  1273. delete(members, "consumer1")
  1274. members["consumer2"] = ConsumerGroupMemberMetadata{
  1275. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1276. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1277. }
  1278. members["consumer3"] = ConsumerGroupMemberMetadata{
  1279. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1280. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1281. }
  1282. plan2, err := s.Plan(members, topics)
  1283. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1284. verifyFullyBalanced(t, plan2)
  1285. }
  1286. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) {
  1287. s := &stickyBalanceStrategy{}
  1288. // PLAN 1
  1289. members := map[string]ConsumerGroupMemberMetadata{
  1290. "consumer1": {
  1291. Topics: []string{"topic1"},
  1292. },
  1293. "consumer2": {
  1294. Topics: []string{"topic1", "topic2"},
  1295. },
  1296. "consumer3": {
  1297. Topics: []string{"topic1", "topic2", "topic3"},
  1298. },
  1299. }
  1300. topics := map[string][]int32{
  1301. "topic1": {0},
  1302. "topic2": {0, 1},
  1303. "topic3": {0, 1, 2},
  1304. }
  1305. plan1, err := s.Plan(members, topics)
  1306. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1307. if len(plan1["consumer1"]["topic1"]) != 1 || len(plan1["consumer2"]["topic2"]) != 2 || len(plan1["consumer3"]["topic3"]) != 3 {
  1308. t.Error("Incorrect distribution of topic partition assignments")
  1309. }
  1310. // PLAN 2
  1311. delete(members, "consumer1")
  1312. members["consumer2"] = ConsumerGroupMemberMetadata{
  1313. Topics: members["consumer2"].Topics,
  1314. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1315. }
  1316. members["consumer3"] = ConsumerGroupMemberMetadata{
  1317. Topics: members["consumer3"].Topics,
  1318. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1319. }
  1320. plan2, err := s.Plan(members, topics)
  1321. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1322. verifyFullyBalanced(t, plan2)
  1323. if len(plan2["consumer2"]["topic1"]) != 1 || len(plan2["consumer2"]["topic2"]) != 2 || len(plan2["consumer3"]["topic3"]) != 3 {
  1324. t.Error("Incorrect distribution of topic partition assignments")
  1325. }
  1326. }
  1327. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) {
  1328. s := &stickyBalanceStrategy{}
  1329. topicNames := []string{"topic1", "topic2"}
  1330. // PLAN 1
  1331. members := map[string]ConsumerGroupMemberMetadata{
  1332. "consumer1": {
  1333. Topics: topicNames,
  1334. },
  1335. "consumer2": {
  1336. Topics: topicNames,
  1337. },
  1338. }
  1339. topics := map[string][]int32{
  1340. "topic1": {0, 1},
  1341. "topic2": {0, 1},
  1342. }
  1343. plan1, err := s.Plan(members, topics)
  1344. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1345. // PLAN 2
  1346. members["consumer1"] = ConsumerGroupMemberMetadata{
  1347. Topics: topicNames,
  1348. }
  1349. members["consumer2"] = ConsumerGroupMemberMetadata{
  1350. Topics: topicNames,
  1351. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1352. }
  1353. members["consumer3"] = ConsumerGroupMemberMetadata{
  1354. Topics: topicNames,
  1355. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1356. }
  1357. plan2, err := s.Plan(members, topics)
  1358. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1359. verifyFullyBalanced(t, plan2)
  1360. }
  1361. func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) {
  1362. s := &stickyBalanceStrategy{}
  1363. // PLAN 1
  1364. members := map[string]ConsumerGroupMemberMetadata{
  1365. "consumer1": {
  1366. Topics: []string{"topic"},
  1367. },
  1368. }
  1369. topics := map[string][]int32{
  1370. "topic": {0, 1, 2},
  1371. }
  1372. plan1, err := s.Plan(members, topics)
  1373. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1374. // PLAN 2
  1375. members["consumer1"] = ConsumerGroupMemberMetadata{
  1376. Topics: []string{"topic"},
  1377. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1378. }
  1379. members["consumer2"] = ConsumerGroupMemberMetadata{
  1380. Topics: []string{"topic"},
  1381. }
  1382. plan2, err := s.Plan(members, topics)
  1383. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1384. // PLAN 3
  1385. delete(members, "consumer1")
  1386. members["consumer2"] = ConsumerGroupMemberMetadata{
  1387. Topics: []string{"topic"},
  1388. UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
  1389. }
  1390. plan3, err := s.Plan(members, topics)
  1391. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1392. }
  1393. func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) {
  1394. s := &stickyBalanceStrategy{}
  1395. // PLAN 1
  1396. members := map[string]ConsumerGroupMemberMetadata{
  1397. "consumer1": {
  1398. Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
  1399. },
  1400. "consumer2": {
  1401. Topics: []string{"topic1", "topic3", "topic5"},
  1402. },
  1403. "consumer3": {
  1404. Topics: []string{"topic1", "topic3", "topic5"},
  1405. },
  1406. "consumer4": {
  1407. Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
  1408. },
  1409. }
  1410. topics := make(map[string][]int32, 5)
  1411. for i := 1; i <= 5; i++ {
  1412. partitions := make([]int32, i%2+1)
  1413. for j := 0; j < i%2+1; j++ {
  1414. partitions[j] = int32(j)
  1415. }
  1416. topics[fmt.Sprintf("topic%d", i)] = partitions
  1417. }
  1418. plan, err := s.Plan(members, topics)
  1419. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1420. }
  1421. func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) {
  1422. s := &stickyBalanceStrategy{}
  1423. // PLAN 1
  1424. members := map[string]ConsumerGroupMemberMetadata{
  1425. "consumer1": {
  1426. Topics: []string{"topic1"},
  1427. },
  1428. "consumer2": {
  1429. Topics: []string{"topic1"},
  1430. },
  1431. }
  1432. topics := map[string][]int32{
  1433. "topic1": {0, 1, 2},
  1434. }
  1435. plan1, err := s.Plan(members, topics)
  1436. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1437. verifyFullyBalanced(t, plan1)
  1438. // PLAN 2
  1439. members["consumer1"] = ConsumerGroupMemberMetadata{
  1440. Topics: []string{"topic1", "topic2"},
  1441. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1442. }
  1443. members["consumer2"] = ConsumerGroupMemberMetadata{
  1444. Topics: []string{"topic1", "topic2"},
  1445. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1446. }
  1447. topics["topic2"] = []int32{0, 1, 2}
  1448. plan2, err := s.Plan(members, topics)
  1449. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1450. verifyFullyBalanced(t, plan2)
  1451. // PLAN 3
  1452. members["consumer1"] = ConsumerGroupMemberMetadata{
  1453. Topics: []string{"topic1", "topic2"},
  1454. UserData: encodeSubscriberPlan(t, plan2["consumer1"]),
  1455. }
  1456. members["consumer2"] = ConsumerGroupMemberMetadata{
  1457. Topics: []string{"topic1", "topic2"},
  1458. UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
  1459. }
  1460. delete(topics, "topic1")
  1461. plan3, err := s.Plan(members, topics)
  1462. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1463. verifyFullyBalanced(t, plan3)
  1464. }
  1465. func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) {
  1466. s := &stickyBalanceStrategy{}
  1467. // PLAN 1
  1468. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1469. for i := 0; i < 20; i++ {
  1470. topics := make([]string, 20)
  1471. for j := 0; j < 20; j++ {
  1472. topics[j] = fmt.Sprintf("topic%d", j)
  1473. }
  1474. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1475. }
  1476. topics := make(map[string][]int32, 20)
  1477. for i := 0; i < 20; i++ {
  1478. partitions := make([]int32, 20)
  1479. for j := 0; j < 20; j++ {
  1480. partitions[j] = int32(j)
  1481. }
  1482. topics[fmt.Sprintf("topic%d", i)] = partitions
  1483. }
  1484. plan1, err := s.Plan(members, topics)
  1485. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1486. for i := 0; i < 20; i++ {
  1487. topics := make([]string, 20)
  1488. for j := 0; j < 20; j++ {
  1489. topics[j] = fmt.Sprintf("topic%d", j)
  1490. }
  1491. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1492. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1493. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1494. }
  1495. }
  1496. delete(members, "consumer10")
  1497. plan2, err := s.Plan(members, topics)
  1498. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1499. }
  1500. func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) {
  1501. s := &stickyBalanceStrategy{}
  1502. // PLAN 1
  1503. members := make(map[string]ConsumerGroupMemberMetadata)
  1504. for i := 0; i < 10; i++ {
  1505. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1506. }
  1507. partitions := make([]int32, 20)
  1508. for j := 0; j < 20; j++ {
  1509. partitions[j] = int32(j)
  1510. }
  1511. topics := map[string][]int32{"topic1": partitions}
  1512. plan1, err := s.Plan(members, topics)
  1513. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1514. // add a new consumer
  1515. members["consumer10"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1516. plan2, err := s.Plan(members, topics)
  1517. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1518. }
  1519. func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) {
  1520. s := &stickyBalanceStrategy{}
  1521. // PLAN 1
  1522. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1523. for i := 0; i < 9; i++ {
  1524. topics := make([]string, 15)
  1525. for j := 0; j < 15; j++ {
  1526. topics[j] = fmt.Sprintf("topic%d", j)
  1527. }
  1528. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1529. }
  1530. topics := make(map[string][]int32, 15)
  1531. for i := 0; i < 15; i++ {
  1532. partitions := make([]int32, i)
  1533. for j := 0; j < i; j++ {
  1534. partitions[j] = int32(j)
  1535. }
  1536. topics[fmt.Sprintf("topic%d", i)] = partitions
  1537. }
  1538. plan1, err := s.Plan(members, topics)
  1539. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1540. // PLAN 2
  1541. for i := 0; i < 9; i++ {
  1542. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1543. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1544. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1545. }
  1546. }
  1547. delete(members, "consumer5")
  1548. plan2, err := s.Plan(members, topics)
  1549. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1550. }
  1551. func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) {
  1552. s := &stickyBalanceStrategy{}
  1553. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1554. // PLAN 1
  1555. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1556. for i := 0; i < 200; i++ {
  1557. topics := make([]string, 200)
  1558. for j := 0; j < 200; j++ {
  1559. topics[j] = fmt.Sprintf("topic%d", j)
  1560. }
  1561. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1562. }
  1563. topics := make(map[string][]int32, 40)
  1564. for i := 0; i < 40; i++ {
  1565. partitionCount := r.Intn(20)
  1566. partitions := make([]int32, partitionCount)
  1567. for j := 0; j < partitionCount; j++ {
  1568. partitions[j] = int32(j)
  1569. }
  1570. topics[fmt.Sprintf("topic%d", i)] = partitions
  1571. }
  1572. plan1, err := s.Plan(members, topics)
  1573. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1574. for i := 0; i < 200; i++ {
  1575. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1576. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1577. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1578. }
  1579. }
  1580. for i := 0; i < 50; i++ {
  1581. delete(members, fmt.Sprintf("consumer%d", i))
  1582. }
  1583. plan2, err := s.Plan(members, topics)
  1584. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1585. }
  1586. func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) {
  1587. s := &stickyBalanceStrategy{}
  1588. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1589. for i := 0; i < 3; i++ {
  1590. topics := make([]string, 0)
  1591. for j := i; j <= 3*i-2; j++ {
  1592. topics = append(topics, fmt.Sprintf("topic%d", j))
  1593. }
  1594. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1595. }
  1596. topics := make(map[string][]int32, 5)
  1597. for i := 1; i < 5; i++ {
  1598. topics[fmt.Sprintf("topic%d", i)] = []int32{0}
  1599. }
  1600. plan1, err := s.Plan(members, topics)
  1601. if err != nil {
  1602. t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
  1603. return
  1604. }
  1605. verifyValidityAndBalance(t, members, plan1)
  1606. members["consumer0"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1607. plan2, err := s.Plan(members, topics)
  1608. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1609. }
  1610. func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) {
  1611. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1612. minNumConsumers := 20
  1613. maxNumConsumers := 40
  1614. minNumTopics := 10
  1615. maxNumTopics := 20
  1616. for round := 0; round < 100; round++ {
  1617. numTopics := minNumTopics + r.Intn(maxNumTopics-minNumTopics)
  1618. topics := make([]string, numTopics)
  1619. partitionsPerTopic := make(map[string][]int32, numTopics)
  1620. for i := 0; i < numTopics; i++ {
  1621. topicName := fmt.Sprintf("topic%d", i)
  1622. topics[i] = topicName
  1623. partitions := make([]int32, maxNumTopics)
  1624. for j := 0; j < maxNumTopics; j++ {
  1625. partitions[j] = int32(j)
  1626. }
  1627. partitionsPerTopic[topicName] = partitions
  1628. }
  1629. numConsumers := minNumConsumers + r.Intn(maxNumConsumers-minNumConsumers)
  1630. members := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
  1631. for i := 0; i < numConsumers; i++ {
  1632. sub := getRandomSublist(r, topics)
  1633. sort.Strings(sub)
  1634. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: sub}
  1635. }
  1636. s := &stickyBalanceStrategy{}
  1637. plan, err := s.Plan(members, partitionsPerTopic)
  1638. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1639. // PLAN 2
  1640. membersPlan2 := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
  1641. for i := 0; i < numConsumers; i++ {
  1642. sub := getRandomSublist(r, topics)
  1643. sort.Strings(sub)
  1644. membersPlan2[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1645. Topics: sub,
  1646. UserData: encodeSubscriberPlan(t, plan[fmt.Sprintf("consumer%d", i)]),
  1647. }
  1648. }
  1649. plan2, err := s.Plan(membersPlan2, partitionsPerTopic)
  1650. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1651. }
  1652. }
  1653. func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) {
  1654. s := &stickyBalanceStrategy{}
  1655. topics := make(map[string][]int32, 6)
  1656. for i := 1; i <= 6; i++ {
  1657. topics[fmt.Sprintf("topic%d", i)] = []int32{0}
  1658. }
  1659. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1660. members["consumer1"] = ConsumerGroupMemberMetadata{
  1661. Topics: []string{"topic1", "topic2"},
  1662. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic1": {0}}),
  1663. }
  1664. members["consumer2"] = ConsumerGroupMemberMetadata{
  1665. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1666. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic2": {0}, "topic3": {0}}),
  1667. }
  1668. members["consumer3"] = ConsumerGroupMemberMetadata{
  1669. Topics: []string{"topic2", "topic3", "topic4", "topic5", "topic6"},
  1670. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic4": {0}, "topic5": {0}, "topic6": {0}}),
  1671. }
  1672. plan, err := s.Plan(members, topics)
  1673. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1674. }
  1675. func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) {
  1676. s := &stickyBalanceStrategy{}
  1677. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1678. members := map[string]ConsumerGroupMemberMetadata{
  1679. "consumer1": {Topics: []string{"topic1"}},
  1680. "consumer2": {Topics: []string{"topic1"}},
  1681. "consumer3": {Topics: []string{"topic1"}},
  1682. "consumer4": {Topics: []string{"topic1"}},
  1683. }
  1684. plan1, err := s.Plan(members, topics)
  1685. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1686. // PLAN 2
  1687. // remove the potential group leader
  1688. delete(members, "consumer1")
  1689. for i := 2; i <= 4; i++ {
  1690. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1691. Topics: []string{"topic1"},
  1692. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1693. }
  1694. }
  1695. plan2, err := s.Plan(members, topics)
  1696. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1697. }
  1698. func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) {
  1699. s := &stickyBalanceStrategy{}
  1700. topics := make(map[string][]int32, 2)
  1701. topics["topic1"] = []int32{0}
  1702. topics["topic3"] = make([]int32, 100)
  1703. for i := 0; i < 100; i++ {
  1704. topics["topic3"][i] = int32(i)
  1705. }
  1706. members := map[string]ConsumerGroupMemberMetadata{
  1707. "consumer1": {Topics: []string{"topic1", "topic2", "topic3"}},
  1708. }
  1709. plan, err := s.Plan(members, topics)
  1710. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1711. verifyFullyBalanced(t, plan)
  1712. if (len(plan["consumer1"]["topic1"]) + len(plan["consumer1"]["topic3"])) != 101 {
  1713. t.Error("Incorrect number of partitions assigned")
  1714. return
  1715. }
  1716. }
  1717. func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) {
  1718. s := &stickyBalanceStrategy{}
  1719. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1720. members := map[string]ConsumerGroupMemberMetadata{
  1721. "consumer1": {Topics: []string{"topic1"}},
  1722. }
  1723. plan1, err := s.Plan(members, topics)
  1724. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1725. // PLAN 2
  1726. members["consumer1"] = ConsumerGroupMemberMetadata{
  1727. Topics: members["consumer1"].Topics,
  1728. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1729. }
  1730. plan2, err := s.Plan(members, map[string][]int32{})
  1731. if len(plan2) != 1 {
  1732. t.Error("Incorrect number of consumers")
  1733. return
  1734. }
  1735. if len(plan2["consumer1"]) != 0 {
  1736. t.Error("Incorrect number of consumer topic assignments")
  1737. return
  1738. }
  1739. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1740. }
  1741. func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) {
  1742. s := &stickyBalanceStrategy{}
  1743. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1744. members := map[string]ConsumerGroupMemberMetadata{
  1745. "consumer1": {Topics: []string{"topic1"}},
  1746. "consumer2": {Topics: []string{"topic1"}},
  1747. "consumer3": {Topics: []string{"topic1"}},
  1748. }
  1749. plan1, err := s.Plan(members, topics)
  1750. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1751. verifyFullyBalanced(t, plan1)
  1752. // PLAN 2
  1753. members["consumer1"] = ConsumerGroupMemberMetadata{
  1754. Topics: []string{"topic1"},
  1755. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
  1756. }
  1757. members["consumer2"] = ConsumerGroupMemberMetadata{
  1758. Topics: []string{"topic1"},
  1759. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
  1760. }
  1761. delete(members, "consumer3")
  1762. plan2, err := s.Plan(members, topics)
  1763. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1764. verifyFullyBalanced(t, plan2)
  1765. if len(intersection(plan1["consumer1"]["topic1"], plan2["consumer1"]["topic1"])) != 2 {
  1766. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1767. }
  1768. if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
  1769. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1770. }
  1771. // PLAN 3
  1772. delete(members, "consumer1")
  1773. members["consumer2"] = ConsumerGroupMemberMetadata{
  1774. Topics: []string{"topic1"},
  1775. UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
  1776. }
  1777. members["consumer3"] = ConsumerGroupMemberMetadata{
  1778. Topics: []string{"topic1"},
  1779. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
  1780. }
  1781. plan3, err := s.Plan(members, topics)
  1782. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1783. verifyFullyBalanced(t, plan3)
  1784. }
  1785. func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) {
  1786. s := &stickyBalanceStrategy{}
  1787. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1788. members := map[string]ConsumerGroupMemberMetadata{
  1789. "consumer1": {Topics: []string{"topic1"}},
  1790. "consumer2": {Topics: []string{"topic1"}},
  1791. "consumer3": {Topics: []string{"topic1"}},
  1792. }
  1793. plan1, err := s.Plan(members, topics)
  1794. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1795. verifyFullyBalanced(t, plan1)
  1796. // PLAN 2
  1797. delete(members, "consumer1")
  1798. members["consumer2"] = ConsumerGroupMemberMetadata{
  1799. Topics: []string{"topic1"},
  1800. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
  1801. }
  1802. delete(members, "consumer3")
  1803. plan2, err := s.Plan(members, topics)
  1804. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1805. verifyFullyBalanced(t, plan2)
  1806. if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
  1807. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1808. }
  1809. // PLAN 3
  1810. members["consumer1"] = ConsumerGroupMemberMetadata{
  1811. Topics: []string{"topic1"},
  1812. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
  1813. }
  1814. members["consumer2"] = ConsumerGroupMemberMetadata{
  1815. Topics: []string{"topic1"},
  1816. UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
  1817. }
  1818. members["consumer3"] = ConsumerGroupMemberMetadata{
  1819. Topics: []string{"topic1"},
  1820. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
  1821. }
  1822. plan3, err := s.Plan(members, topics)
  1823. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1824. verifyFullyBalanced(t, plan3)
  1825. }
  1826. func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
  1827. s := &stickyBalanceStrategy{}
  1828. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1829. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1830. members["consumer1"] = ConsumerGroupMemberMetadata{
  1831. Topics: []string{"topic1"},
  1832. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1, 4}}, 1),
  1833. }
  1834. members["consumer2"] = ConsumerGroupMemberMetadata{
  1835. Topics: []string{"topic1"},
  1836. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2, 3}}, 1),
  1837. }
  1838. members["consumer3"] = ConsumerGroupMemberMetadata{
  1839. Topics: []string{"topic1"},
  1840. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {3, 4, 5}}, 2),
  1841. }
  1842. plan, err := s.Plan(members, topics)
  1843. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1844. verifyFullyBalanced(t, plan)
  1845. }
  1846. func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) {
  1847. s := &stickyBalanceStrategy{}
  1848. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1849. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1850. members["consumer1"] = ConsumerGroupMemberMetadata{
  1851. Topics: []string{"topic1"},
  1852. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2}}, 1),
  1853. }
  1854. members["consumer2"] = ConsumerGroupMemberMetadata{
  1855. Topics: []string{"topic1"},
  1856. UserData: encodeSubscriberPlanWithOldSchema(t, map[string][]int32{"topic1": {1}}),
  1857. }
  1858. members["consumer3"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1859. plan, err := s.Plan(members, topics)
  1860. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1861. verifyFullyBalanced(t, plan)
  1862. }
  1863. func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) {
  1864. s := &stickyBalanceStrategy{}
  1865. topics := map[string][]int32{"topic1": {0, 1}}
  1866. members := make(map[string]ConsumerGroupMemberMetadata, 2)
  1867. members["consumer1"] = ConsumerGroupMemberMetadata{
  1868. Topics: []string{"topic1"},
  1869. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
  1870. }
  1871. members["consumer2"] = ConsumerGroupMemberMetadata{
  1872. Topics: []string{"topic1"},
  1873. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
  1874. }
  1875. plan, err := s.Plan(members, topics)
  1876. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1877. verifyFullyBalanced(t, plan)
  1878. }
  1879. func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) {
  1880. s := &stickyBalanceStrategy{}
  1881. members := make(map[string]ConsumerGroupMemberMetadata, 2)
  1882. members["consumer1"] = ConsumerGroupMemberMetadata{
  1883. Topics: []string{"topic1"},
  1884. }
  1885. members["consumer2"] = ConsumerGroupMemberMetadata{
  1886. Topics: []string{"topic1"},
  1887. }
  1888. expected := encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1)
  1889. actual, err := s.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
  1890. if err != nil {
  1891. t.Errorf("Error building assignment data: %v", err)
  1892. }
  1893. if !bytes.Equal(expected, actual) {
  1894. t.Error("Invalid assignment data returned from AssignmentData")
  1895. }
  1896. }
  1897. func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
  1898. s := &stickyBalanceStrategy{}
  1899. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1900. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1901. for i := 0; i < 200; i++ {
  1902. topics := make([]string, 200)
  1903. for j := 0; j < 200; j++ {
  1904. topics[j] = fmt.Sprintf("topic%d", j)
  1905. }
  1906. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1907. }
  1908. topics := make(map[string][]int32, 40)
  1909. for i := 0; i < 40; i++ {
  1910. partitionCount := r.Intn(20)
  1911. partitions := make([]int32, partitionCount)
  1912. for j := 0; j < partitionCount; j++ {
  1913. partitions[j] = int32(j)
  1914. }
  1915. topics[fmt.Sprintf("topic%d", i)] = partitions
  1916. }
  1917. b.ResetTimer()
  1918. for n := 0; n < b.N; n++ {
  1919. if _, err := s.Plan(members, topics); err != nil {
  1920. b.Errorf("Error building plan in benchmark: %v", err)
  1921. }
  1922. }
  1923. }
  1924. func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssignments(b *testing.B) {
  1925. s := &stickyBalanceStrategy{}
  1926. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1927. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1928. for i := 0; i < 200; i++ {
  1929. topics := make([]string, 200)
  1930. for j := 0; j < 200; j++ {
  1931. topics[j] = fmt.Sprintf("topic%d", j)
  1932. }
  1933. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1934. }
  1935. topics := make(map[string][]int32, 40)
  1936. for i := 0; i < 40; i++ {
  1937. partitionCount := r.Intn(20)
  1938. partitions := make([]int32, partitionCount)
  1939. for j := 0; j < partitionCount; j++ {
  1940. partitions[j] = int32(j)
  1941. }
  1942. topics[fmt.Sprintf("topic%d", i)] = partitions
  1943. }
  1944. plan, _ := s.Plan(members, topics)
  1945. for i := 0; i < 200; i++ {
  1946. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1947. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1948. UserData: encodeSubscriberPlanWithGenerationForBenchmark(b, plan[fmt.Sprintf("consumer%d", i)], 1),
  1949. }
  1950. }
  1951. for i := 0; i < 1; i++ {
  1952. delete(members, fmt.Sprintf("consumer%d", i))
  1953. }
  1954. b.ResetTimer()
  1955. for n := 0; n < b.N; n++ {
  1956. if _, err := s.Plan(members, topics); err != nil {
  1957. b.Errorf("Error building plan in benchmark: %v", err)
  1958. }
  1959. }
  1960. }
  1961. func verifyPlanIsBalancedAndSticky(t *testing.T, s *stickyBalanceStrategy, members map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan, err error) {
  1962. if err != nil {
  1963. t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
  1964. return
  1965. }
  1966. if !s.movements.isSticky() {
  1967. t.Error("stickyBalanceStrategy.Plan() not sticky")
  1968. return
  1969. }
  1970. verifyValidityAndBalance(t, members, plan)
  1971. }
  1972. func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan) {
  1973. size := len(consumers)
  1974. if size != len(plan) {
  1975. t.Errorf("Subscription size (%d) not equal to plan size (%d)", size, len(plan))
  1976. t.FailNow()
  1977. }
  1978. members := make([]string, size)
  1979. i := 0
  1980. for memberID := range consumers {
  1981. members[i] = memberID
  1982. i++
  1983. }
  1984. sort.Strings(members)
  1985. for i, memberID := range members {
  1986. for assignedTopic := range plan[memberID] {
  1987. found := false
  1988. for _, assignableTopic := range consumers[memberID].Topics {
  1989. if assignableTopic == assignableTopic {
  1990. found = true
  1991. break
  1992. }
  1993. }
  1994. if !found {
  1995. t.Errorf("Consumer %s had assigned topic %s that wasn't in the list of assignable topics", memberID, assignedTopic)
  1996. t.FailNow()
  1997. }
  1998. }
  1999. // skip last consumer
  2000. if i == len(members)-1 {
  2001. continue
  2002. }
  2003. consumerAssignments := make([]topicPartitionAssignment, 0)
  2004. for topic, partitions := range plan[memberID] {
  2005. for _, partition := range partitions {
  2006. consumerAssignments = append(consumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
  2007. }
  2008. }
  2009. for j := i + 1; j < size; j++ {
  2010. otherConsumer := members[j]
  2011. otherConsumerAssignments := make([]topicPartitionAssignment, 0)
  2012. for topic, partitions := range plan[otherConsumer] {
  2013. for _, partition := range partitions {
  2014. otherConsumerAssignments = append(otherConsumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
  2015. }
  2016. }
  2017. assignmentsIntersection := intersection(consumerAssignments, otherConsumerAssignments)
  2018. if len(assignmentsIntersection) > 0 {
  2019. t.Errorf("Consumers %s and %s have common partitions assigned to them: %v", memberID, otherConsumer, assignmentsIntersection)
  2020. t.FailNow()
  2021. }
  2022. if math.Abs(float64(len(consumerAssignments)-len(otherConsumerAssignments))) <= 1 {
  2023. continue
  2024. }
  2025. if len(consumerAssignments) > len(otherConsumerAssignments) {
  2026. for _, topic := range consumerAssignments {
  2027. if _, exists := plan[otherConsumer][topic.Topic]; exists {
  2028. t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", otherConsumer, memberID, memberID, len(consumerAssignments), otherConsumer, len(otherConsumerAssignments))
  2029. t.FailNow()
  2030. }
  2031. }
  2032. }
  2033. if len(otherConsumerAssignments) > len(consumerAssignments) {
  2034. for _, topic := range otherConsumerAssignments {
  2035. if _, exists := plan[memberID][topic.Topic]; exists {
  2036. t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", memberID, otherConsumer, otherConsumer, len(otherConsumerAssignments), memberID, len(consumerAssignments))
  2037. t.FailNow()
  2038. }
  2039. }
  2040. }
  2041. }
  2042. }
  2043. }
  2044. // Produces the intersection of two slices
  2045. // From https://github.com/juliangruber/go-intersect
  2046. func intersection(a interface{}, b interface{}) []interface{} {
  2047. set := make([]interface{}, 0)
  2048. hash := make(map[interface{}]bool)
  2049. av := reflect.ValueOf(a)
  2050. bv := reflect.ValueOf(b)
  2051. for i := 0; i < av.Len(); i++ {
  2052. el := av.Index(i).Interface()
  2053. hash[el] = true
  2054. }
  2055. for i := 0; i < bv.Len(); i++ {
  2056. el := bv.Index(i).Interface()
  2057. if _, found := hash[el]; found {
  2058. set = append(set, el)
  2059. }
  2060. }
  2061. return set
  2062. }
  2063. func encodeSubscriberPlan(t *testing.T, assignments map[string][]int32) []byte {
  2064. return encodeSubscriberPlanWithGeneration(t, assignments, defaultGeneration)
  2065. }
  2066. func encodeSubscriberPlanWithGeneration(t *testing.T, assignments map[string][]int32, generation int32) []byte {
  2067. userDataBytes, err := encode(&StickyAssignorUserDataV1{
  2068. Topics: assignments,
  2069. Generation: generation,
  2070. }, nil)
  2071. if err != nil {
  2072. t.Errorf("encodeSubscriberPlan error = %v", err)
  2073. t.FailNow()
  2074. }
  2075. return userDataBytes
  2076. }
  2077. func encodeSubscriberPlanWithGenerationForBenchmark(b *testing.B, assignments map[string][]int32, generation int32) []byte {
  2078. userDataBytes, err := encode(&StickyAssignorUserDataV1{
  2079. Topics: assignments,
  2080. Generation: generation,
  2081. }, nil)
  2082. if err != nil {
  2083. b.Errorf("encodeSubscriberPlan error = %v", err)
  2084. b.FailNow()
  2085. }
  2086. return userDataBytes
  2087. }
  2088. func encodeSubscriberPlanWithOldSchema(t *testing.T, assignments map[string][]int32) []byte {
  2089. userDataBytes, err := encode(&StickyAssignorUserDataV0{
  2090. Topics: assignments,
  2091. }, nil)
  2092. if err != nil {
  2093. t.Errorf("encodeSubscriberPlan error = %v", err)
  2094. t.FailNow()
  2095. }
  2096. return userDataBytes
  2097. }
  2098. // verify that the plan is fully balanced, assumes that all consumers can
  2099. // consume from the same set of topics
  2100. func verifyFullyBalanced(t *testing.T, plan BalanceStrategyPlan) {
  2101. min := math.MaxInt32
  2102. max := math.MinInt32
  2103. for _, topics := range plan {
  2104. assignedPartitionsCount := 0
  2105. for _, partitions := range topics {
  2106. assignedPartitionsCount += len(partitions)
  2107. }
  2108. if assignedPartitionsCount < min {
  2109. min = assignedPartitionsCount
  2110. }
  2111. if assignedPartitionsCount > max {
  2112. max = assignedPartitionsCount
  2113. }
  2114. }
  2115. if (max - min) > 1 {
  2116. t.Errorf("Plan partition assignment is not fully balanced: min=%d, max=%d", min, max)
  2117. }
  2118. }
  2119. func getRandomSublist(r *rand.Rand, s []string) []string {
  2120. howManyToRemove := r.Intn(len(s))
  2121. allEntriesMap := make(map[int]string)
  2122. for i, s := range s {
  2123. allEntriesMap[i] = s
  2124. }
  2125. for i := 0; i < howManyToRemove; i++ {
  2126. delete(allEntriesMap, r.Intn(len(allEntriesMap)))
  2127. }
  2128. subList := make([]string, len(allEntriesMap))
  2129. i := 0
  2130. for _, s := range allEntriesMap {
  2131. subList[i] = s
  2132. i++
  2133. }
  2134. return subList
  2135. }
  2136. func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) {
  2137. type args struct {
  2138. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  2139. }
  2140. tests := []struct {
  2141. name string
  2142. args args
  2143. want []topicPartitionAssignment
  2144. }{
  2145. {
  2146. name: "Single topic partition",
  2147. args: args{
  2148. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2149. {
  2150. Topic: "t1",
  2151. Partition: 0,
  2152. }: {"c1", "c2"},
  2153. },
  2154. },
  2155. want: []topicPartitionAssignment{
  2156. {
  2157. Topic: "t1",
  2158. Partition: 0,
  2159. },
  2160. },
  2161. },
  2162. {
  2163. name: "Multiple topic partitions with the same number of consumers but different topic names",
  2164. args: args{
  2165. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2166. {
  2167. Topic: "t1",
  2168. Partition: 0,
  2169. }: {"c1", "c2"},
  2170. {
  2171. Topic: "t2",
  2172. Partition: 0,
  2173. }: {"c1", "c2"},
  2174. },
  2175. },
  2176. want: []topicPartitionAssignment{
  2177. {
  2178. Topic: "t1",
  2179. Partition: 0,
  2180. },
  2181. {
  2182. Topic: "t2",
  2183. Partition: 0,
  2184. },
  2185. },
  2186. },
  2187. {
  2188. name: "Multiple topic partitions with the same number of consumers and topic names",
  2189. args: args{
  2190. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2191. {
  2192. Topic: "t1",
  2193. Partition: 0,
  2194. }: {"c1", "c2"},
  2195. {
  2196. Topic: "t1",
  2197. Partition: 1,
  2198. }: {"c1", "c2"},
  2199. },
  2200. },
  2201. want: []topicPartitionAssignment{
  2202. {
  2203. Topic: "t1",
  2204. Partition: 0,
  2205. },
  2206. {
  2207. Topic: "t1",
  2208. Partition: 1,
  2209. },
  2210. },
  2211. },
  2212. }
  2213. for _, tt := range tests {
  2214. t.Run(tt.name, func(t *testing.T) {
  2215. if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
  2216. t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want)
  2217. }
  2218. })
  2219. }
  2220. }