balance_strategy_test.go 68 KB


  1. package sarama
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. "reflect"
  7. "sort"
  8. "testing"
  9. "time"
  10. )
  11. func TestBalanceStrategyRange(t *testing.T) {
  12. tests := []struct {
  13. members map[string][]string
  14. topics map[string][]int32
  15. expected BalanceStrategyPlan
  16. }{
  17. {
  18. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  19. topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
  20. expected: BalanceStrategyPlan{
  21. "M1": map[string][]int32{"T1": {0, 1}, "T2": {2, 3}},
  22. "M2": map[string][]int32{"T1": {2, 3}, "T2": {0, 1}},
  23. },
  24. },
  25. {
  26. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  27. topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
  28. expected: BalanceStrategyPlan{
  29. "M1": map[string][]int32{"T1": {0, 1}, "T2": {2}},
  30. "M2": map[string][]int32{"T1": {2}, "T2": {0, 1}},
  31. },
  32. },
  33. {
  34. members: map[string][]string{"M1": {"T1"}, "M2": {"T1", "T2"}},
  35. topics: map[string][]int32{"T1": {0, 1}, "T2": {0, 1}},
  36. expected: BalanceStrategyPlan{
  37. "M1": map[string][]int32{"T1": {0}},
  38. "M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
  39. },
  40. },
  41. }
  42. strategy := BalanceStrategyRange
  43. if strategy.Name() != "range" {
  44. t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
  45. }
  46. for _, test := range tests {
  47. members := make(map[string]ConsumerGroupMemberMetadata)
  48. for memberID, topics := range test.members {
  49. members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
  50. }
  51. actual, err := strategy.Plan(members, test.topics)
  52. if err != nil {
  53. t.Errorf("Unexpected error %v", err)
  54. } else if !reflect.DeepEqual(actual, test.expected) {
  55. t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
  56. }
  57. }
  58. }
  59. func TestBalanceStrategyRoundRobin(t *testing.T) {
  60. tests := []struct {
  61. members map[string][]string
  62. topics map[string][]int32
  63. expected BalanceStrategyPlan
  64. }{
  65. {
  66. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  67. topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
  68. expected: BalanceStrategyPlan{
  69. "M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
  70. "M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
  71. },
  72. },
  73. {
  74. members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
  75. topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
  76. expected: BalanceStrategyPlan{
  77. "M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
  78. "M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
  79. },
  80. },
  81. }
  82. strategy := BalanceStrategyRoundRobin
  83. if strategy.Name() != "roundrobin" {
  84. t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
  85. }
  86. for _, test := range tests {
  87. members := make(map[string]ConsumerGroupMemberMetadata)
  88. for memberID, topics := range test.members {
  89. members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
  90. }
  91. actual, err := strategy.Plan(members, test.topics)
  92. if err != nil {
  93. t.Errorf("Unexpected error %v", err)
  94. } else if !reflect.DeepEqual(actual, test.expected) {
  95. t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
  96. }
  97. }
  98. }
  99. func Test_deserializeTopicPartitionAssignment(t *testing.T) {
  100. type args struct {
  101. userDataBytes []byte
  102. }
  103. tests := []struct {
  104. name string
  105. args args
  106. want StickyAssignorUserData
  107. wantErr bool
  108. }{
  109. {
  110. name: "Nil userdata bytes",
  111. args: args{},
  112. want: &StickyAssignorUserDataV1{},
  113. },
  114. {
  115. name: "Non-empty invalid userdata bytes",
  116. args: args{
  117. userDataBytes: []byte{
  118. 0x00, 0x00,
  119. 0x00, 0x00, 0x00, 0x01,
  120. 0x00, 0x03, 'f', 'o', 'o',
  121. },
  122. },
  123. wantErr: true,
  124. },
  125. {
  126. name: "Valid v0 userdata bytes",
  127. args: args{
  128. userDataBytes: []byte{
  129. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  130. 0x33, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
  131. 0x05,
  132. },
  133. },
  134. want: &StickyAssignorUserDataV0{
  135. Topics: map[string][]int32{"t03": {5}},
  136. topicPartitions: []topicPartitionAssignment{
  137. {
  138. Topic: "t03",
  139. Partition: 5,
  140. },
  141. },
  142. },
  143. },
  144. {
  145. name: "Valid v1 userdata bytes",
  146. args: args{
  147. userDataBytes: []byte{
  148. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  149. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  150. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  151. 0xff,
  152. },
  153. },
  154. want: &StickyAssignorUserDataV1{
  155. Topics: map[string][]int32{"t06": {0, 4}},
  156. Generation: -1,
  157. topicPartitions: []topicPartitionAssignment{
  158. {
  159. Topic: "t06",
  160. Partition: 0,
  161. },
  162. {
  163. Topic: "t06",
  164. Partition: 4,
  165. },
  166. },
  167. },
  168. },
  169. }
  170. for _, tt := range tests {
  171. t.Run(tt.name, func(t *testing.T) {
  172. got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
  173. if (err != nil) != tt.wantErr {
  174. t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
  175. return
  176. }
  177. if !reflect.DeepEqual(got, tt.want) {
  178. t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
  179. }
  180. })
  181. }
  182. }
  183. func Test_prepopulateCurrentAssignments(t *testing.T) {
  184. type args struct {
  185. members map[string]ConsumerGroupMemberMetadata
  186. }
  187. tests := []struct {
  188. name string
  189. args args
  190. wantCurrentAssignments map[string][]topicPartitionAssignment
  191. wantPrevAssignments map[topicPartitionAssignment]consumerGenerationPair
  192. wantErr bool
  193. }{
  194. {
  195. name: "Empty map",
  196. wantCurrentAssignments: map[string][]topicPartitionAssignment{},
  197. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  198. },
  199. {
  200. name: "Single consumer",
  201. args: args{
  202. members: map[string]ConsumerGroupMemberMetadata{
  203. "c01": {
  204. Version: 2,
  205. UserData: []byte{
  206. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  207. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  208. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  209. 0xff,
  210. },
  211. },
  212. },
  213. },
  214. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  215. "c01": {
  216. {
  217. Topic: "t06",
  218. Partition: 0,
  219. },
  220. {
  221. Topic: "t06",
  222. Partition: 4,
  223. },
  224. },
  225. },
  226. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  227. },
  228. {
  229. name: "Duplicate consumer assignments in metadata",
  230. args: args{
  231. members: map[string]ConsumerGroupMemberMetadata{
  232. "c01": {
  233. Version: 2,
  234. UserData: []byte{
  235. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  236. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  237. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  238. 0xff,
  239. },
  240. },
  241. "c02": {
  242. Version: 2,
  243. UserData: []byte{
  244. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  245. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  246. 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
  247. 0xff,
  248. },
  249. },
  250. },
  251. },
  252. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  253. "c01": {
  254. {
  255. Topic: "t06",
  256. Partition: 0,
  257. },
  258. {
  259. Topic: "t06",
  260. Partition: 4,
  261. },
  262. },
  263. },
  264. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
  265. },
  266. {
  267. name: "Different generations (5, 6) of consumer assignments in metadata",
  268. args: args{
  269. members: map[string]ConsumerGroupMemberMetadata{
  270. "c01": {
  271. Version: 2,
  272. UserData: []byte{
  273. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  274. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  275. 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
  276. 0x05,
  277. },
  278. },
  279. "c02": {
  280. Version: 2,
  281. UserData: []byte{
  282. 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
  283. 0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
  284. 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
  285. 0x06,
  286. },
  287. },
  288. },
  289. },
  290. wantCurrentAssignments: map[string][]topicPartitionAssignment{
  291. "c01": {
  292. {
  293. Topic: "t06",
  294. Partition: 0,
  295. },
  296. {
  297. Topic: "t06",
  298. Partition: 4,
  299. },
  300. },
  301. },
  302. wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{
  303. {
  304. Topic: "t06",
  305. Partition: 0,
  306. }: {
  307. Generation: 5,
  308. MemberID: "c01",
  309. },
  310. {
  311. Topic: "t06",
  312. Partition: 4,
  313. }: {
  314. Generation: 5,
  315. MemberID: "c01",
  316. },
  317. },
  318. },
  319. }
  320. for _, tt := range tests {
  321. t.Run(tt.name, func(t *testing.T) {
  322. _, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)
  323. if (err != nil) != tt.wantErr {
  324. t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
  325. }
  326. if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
  327. t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
  328. }
  329. })
  330. }
  331. }
  332. func Test_areSubscriptionsIdentical(t *testing.T) {
  333. type args struct {
  334. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  335. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  336. }
  337. tests := []struct {
  338. name string
  339. args args
  340. want bool
  341. }{
  342. {
  343. name: "Empty consumers and partitions",
  344. args: args{
  345. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  346. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  347. },
  348. want: true,
  349. },
  350. {
  351. name: "Topic partitions with identical consumer entries",
  352. args: args{
  353. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  354. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  355. {Topic: "t1", Partition: 1}: {"c1", "c2", "c3"},
  356. {Topic: "t1", Partition: 2}: {"c1", "c2", "c3"},
  357. },
  358. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  359. },
  360. want: true,
  361. },
  362. {
  363. name: "Topic partitions with mixed up consumer entries",
  364. args: args{
  365. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  366. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  367. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  368. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  369. },
  370. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  371. },
  372. want: true,
  373. },
  374. {
  375. name: "Topic partitions with different consumer entries",
  376. args: args{
  377. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  378. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  379. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  380. {Topic: "t1", Partition: 2}: {"cX", "c1", "c2"},
  381. },
  382. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  383. },
  384. want: false,
  385. },
  386. {
  387. name: "Topic partitions with different number of consumer entries",
  388. args: args{
  389. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  390. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  391. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  392. {Topic: "t1", Partition: 2}: {"c1", "c2"},
  393. },
  394. consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
  395. },
  396. want: false,
  397. },
  398. {
  399. name: "Consumers with identical topic partitions",
  400. args: args{
  401. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  402. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  403. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  404. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  405. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  406. },
  407. },
  408. want: true,
  409. },
  410. {
  411. name: "Consumer2 with mixed up consumer entries",
  412. args: args{
  413. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  414. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  415. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  416. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  417. "c3": {{Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  418. },
  419. },
  420. want: true,
  421. },
  422. {
  423. name: "Consumer2 with different consumer entries",
  424. args: args{
  425. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  426. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  427. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  428. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  429. "c3": {{Topic: "tX", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  430. },
  431. },
  432. want: false,
  433. },
  434. {
  435. name: "Consumer2 with different number of consumer entries",
  436. args: args{
  437. partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
  438. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  439. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  440. "c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
  441. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
  442. },
  443. },
  444. want: false,
  445. },
  446. }
  447. for _, tt := range tests {
  448. t.Run(tt.name, func(t *testing.T) {
  449. if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want {
  450. t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want)
  451. }
  452. })
  453. }
  454. }
  455. func Test_sortMemberIDsByPartitionAssignments(t *testing.T) {
  456. type args struct {
  457. assignments map[string][]topicPartitionAssignment
  458. }
  459. tests := []struct {
  460. name string
  461. args args
  462. want []string
  463. }{
  464. {
  465. name: "Null assignments",
  466. want: make([]string, 0),
  467. },
  468. {
  469. name: "Single assignment",
  470. args: args{
  471. assignments: map[string][]topicPartitionAssignment{
  472. "c1": {
  473. {Topic: "t1", Partition: 0},
  474. {Topic: "t1", Partition: 1},
  475. {Topic: "t1", Partition: 2},
  476. },
  477. },
  478. },
  479. want: []string{"c1"},
  480. },
  481. {
  482. name: "Multiple assignments with different partition counts",
  483. args: args{
  484. assignments: map[string][]topicPartitionAssignment{
  485. "c1": {
  486. {Topic: "t1", Partition: 0},
  487. },
  488. "c2": {
  489. {Topic: "t1", Partition: 1},
  490. {Topic: "t1", Partition: 2},
  491. },
  492. "c3": {
  493. {Topic: "t1", Partition: 3},
  494. {Topic: "t1", Partition: 4},
  495. {Topic: "t1", Partition: 5},
  496. },
  497. },
  498. },
  499. want: []string{"c1", "c2", "c3"},
  500. },
  501. }
  502. for _, tt := range tests {
  503. t.Run(tt.name, func(t *testing.T) {
  504. if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) {
  505. t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want)
  506. }
  507. })
  508. }
  509. }
  510. func Test_sortPartitions(t *testing.T) {
  511. type args struct {
  512. currentAssignment map[string][]topicPartitionAssignment
  513. partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair
  514. isFreshAssignment bool
  515. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  516. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  517. }
  518. tests := []struct {
  519. name string
  520. args args
  521. want []topicPartitionAssignment
  522. }{
  523. {
  524. name: "Empty everything",
  525. want: make([]topicPartitionAssignment, 0),
  526. },
  527. {
  528. name: "Base case",
  529. args: args{
  530. currentAssignment: map[string][]topicPartitionAssignment{
  531. "c1": {{Topic: "t1", Partition: 0}},
  532. "c2": {{Topic: "t1", Partition: 1}},
  533. "c3": {{Topic: "t1", Partition: 2}},
  534. },
  535. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  536. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  537. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  538. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  539. },
  540. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  541. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  542. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  543. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  544. },
  545. },
  546. },
  547. {
  548. name: "Partitions assigned to a different consumer last time",
  549. args: args{
  550. currentAssignment: map[string][]topicPartitionAssignment{
  551. "c1": {{Topic: "t1", Partition: 0}},
  552. },
  553. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  554. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  555. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  556. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  557. },
  558. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  559. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  560. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  561. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  562. },
  563. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  564. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  565. },
  566. },
  567. },
  568. {
  569. name: "Partitions assigned to a different consumer last time",
  570. args: args{
  571. currentAssignment: map[string][]topicPartitionAssignment{
  572. "c1": {{Topic: "t1", Partition: 0}},
  573. "c2": {{Topic: "t1", Partition: 1}},
  574. },
  575. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  576. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  577. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  578. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  579. },
  580. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  581. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  582. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  583. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  584. },
  585. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  586. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  587. },
  588. },
  589. },
  590. {
  591. name: "Fresh assignment",
  592. args: args{
  593. isFreshAssignment: true,
  594. currentAssignment: map[string][]topicPartitionAssignment{},
  595. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  596. "c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  597. "c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  598. "c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
  599. },
  600. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  601. {Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
  602. {Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
  603. {Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
  604. },
  605. partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
  606. {Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
  607. },
  608. },
  609. },
  610. }
  611. for _, tt := range tests {
  612. t.Run(tt.name, func(t *testing.T) {
  613. got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions)
  614. if tt.want != nil && !reflect.DeepEqual(got, tt.want) {
  615. t.Errorf("sortPartitions() = %v, want %v", got, tt.want)
  616. }
  617. })
  618. }
  619. }
  620. func Test_filterAssignedPartitions(t *testing.T) {
  621. type args struct {
  622. currentAssignment map[string][]topicPartitionAssignment
  623. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  624. }
  625. tests := []struct {
  626. name string
  627. args args
  628. want map[string][]topicPartitionAssignment
  629. }{
  630. {
  631. name: "All partitions accounted for",
  632. args: args{
  633. currentAssignment: map[string][]topicPartitionAssignment{
  634. "c1": {{Topic: "t1", Partition: 0}},
  635. "c2": {{Topic: "t1", Partition: 1}},
  636. },
  637. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  638. {Topic: "t1", Partition: 0}: {"c1"},
  639. {Topic: "t1", Partition: 1}: {"c2"},
  640. },
  641. },
  642. want: map[string][]topicPartitionAssignment{
  643. "c1": {{Topic: "t1", Partition: 0}},
  644. "c2": {{Topic: "t1", Partition: 1}},
  645. },
  646. },
  647. {
  648. name: "One consumer using an unrecognized partition",
  649. args: args{
  650. currentAssignment: map[string][]topicPartitionAssignment{
  651. "c1": {{Topic: "t1", Partition: 0}},
  652. "c2": {{Topic: "t1", Partition: 1}},
  653. },
  654. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  655. {Topic: "t1", Partition: 0}: {"c1"},
  656. },
  657. },
  658. want: map[string][]topicPartitionAssignment{
  659. "c1": {{Topic: "t1", Partition: 0}},
  660. "c2": {},
  661. },
  662. },
  663. {
  664. name: "Interleaved consumer removal",
  665. args: args{
  666. currentAssignment: map[string][]topicPartitionAssignment{
  667. "c1": {{Topic: "t1", Partition: 0}},
  668. "c2": {{Topic: "t1", Partition: 1}},
  669. "c3": {{Topic: "t1", Partition: 2}},
  670. },
  671. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  672. {Topic: "t1", Partition: 0}: {"c1"},
  673. {Topic: "t1", Partition: 2}: {"c3"},
  674. },
  675. },
  676. want: map[string][]topicPartitionAssignment{
  677. "c1": {{Topic: "t1", Partition: 0}},
  678. "c2": {},
  679. "c3": {{Topic: "t1", Partition: 2}},
  680. },
  681. },
  682. }
  683. for _, tt := range tests {
  684. t.Run(tt.name, func(t *testing.T) {
  685. if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
  686. t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want)
  687. }
  688. })
  689. }
  690. }
  691. func Test_canConsumerParticipateInReassignment(t *testing.T) {
  692. type args struct {
  693. memberID string
  694. currentAssignment map[string][]topicPartitionAssignment
  695. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  696. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  697. }
  698. tests := []struct {
  699. name string
  700. args args
  701. want bool
  702. }{
  703. {
  704. name: "Consumer has been assigned partitions not available to it",
  705. args: args{
  706. memberID: "c1",
  707. currentAssignment: map[string][]topicPartitionAssignment{
  708. "c1": {
  709. {Topic: "t1", Partition: 0},
  710. {Topic: "t1", Partition: 1},
  711. {Topic: "t1", Partition: 2},
  712. },
  713. "c2": {},
  714. },
  715. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  716. "c1": {
  717. {Topic: "t1", Partition: 0},
  718. {Topic: "t1", Partition: 1},
  719. },
  720. "c2": {
  721. {Topic: "t1", Partition: 0},
  722. {Topic: "t1", Partition: 1},
  723. {Topic: "t1", Partition: 2},
  724. },
  725. },
  726. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  727. {Topic: "t1", Partition: 0}: {"c1", "c2"},
  728. {Topic: "t1", Partition: 1}: {"c1", "c2"},
  729. {Topic: "t1", Partition: 2}: {"c2"},
  730. },
  731. },
  732. want: true,
  733. },
  734. {
  735. name: "Consumer has been assigned all available partitions",
  736. args: args{
  737. memberID: "c1",
  738. currentAssignment: map[string][]topicPartitionAssignment{
  739. "c1": {
  740. {Topic: "t1", Partition: 0},
  741. {Topic: "t1", Partition: 1},
  742. },
  743. },
  744. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  745. "c1": {
  746. {Topic: "t1", Partition: 0},
  747. {Topic: "t1", Partition: 1},
  748. },
  749. },
  750. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  751. {Topic: "t1", Partition: 0}: {"c1"},
  752. {Topic: "t1", Partition: 1}: {"c1"},
  753. },
  754. },
  755. want: false,
  756. },
  757. {
  758. name: "Consumer has not been assigned all available partitions",
  759. args: args{
  760. memberID: "c1",
  761. currentAssignment: map[string][]topicPartitionAssignment{
  762. "c1": {
  763. {Topic: "t1", Partition: 0},
  764. {Topic: "t1", Partition: 1},
  765. },
  766. },
  767. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  768. "c1": {
  769. {Topic: "t1", Partition: 0},
  770. {Topic: "t1", Partition: 1},
  771. {Topic: "t1", Partition: 2},
  772. },
  773. },
  774. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  775. {Topic: "t1", Partition: 0}: {"c1"},
  776. {Topic: "t1", Partition: 1}: {"c1"},
  777. {Topic: "t1", Partition: 2}: {"c1"},
  778. },
  779. },
  780. want: true,
  781. },
  782. }
  783. for _, tt := range tests {
  784. t.Run(tt.name, func(t *testing.T) {
  785. if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want {
  786. t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want)
  787. }
  788. })
  789. }
  790. }
  791. func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) {
  792. type args struct {
  793. assignments []topicPartitionAssignment
  794. topic topicPartitionAssignment
  795. }
  796. tests := []struct {
  797. name string
  798. args args
  799. want []topicPartitionAssignment
  800. }{
  801. {
  802. name: "Empty",
  803. args: args{
  804. assignments: make([]topicPartitionAssignment, 0),
  805. topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
  806. },
  807. want: make([]topicPartitionAssignment, 0),
  808. },
  809. {
  810. name: "Remove first entry",
  811. args: args{
  812. assignments: []topicPartitionAssignment{
  813. {Topic: "t1", Partition: 0},
  814. {Topic: "t1", Partition: 1},
  815. {Topic: "t1", Partition: 2},
  816. },
  817. topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
  818. },
  819. want: []topicPartitionAssignment{
  820. {Topic: "t1", Partition: 1},
  821. {Topic: "t1", Partition: 2},
  822. },
  823. },
  824. {
  825. name: "Remove middle entry",
  826. args: args{
  827. assignments: []topicPartitionAssignment{
  828. {Topic: "t1", Partition: 0},
  829. {Topic: "t1", Partition: 1},
  830. {Topic: "t1", Partition: 2},
  831. },
  832. topic: topicPartitionAssignment{Topic: "t1", Partition: 1},
  833. },
  834. want: []topicPartitionAssignment{
  835. {Topic: "t1", Partition: 0},
  836. {Topic: "t1", Partition: 2},
  837. },
  838. },
  839. {
  840. name: "Remove last entry",
  841. args: args{
  842. assignments: []topicPartitionAssignment{
  843. {Topic: "t1", Partition: 0},
  844. {Topic: "t1", Partition: 1},
  845. {Topic: "t1", Partition: 2},
  846. },
  847. topic: topicPartitionAssignment{Topic: "t1", Partition: 2},
  848. },
  849. want: []topicPartitionAssignment{
  850. {Topic: "t1", Partition: 0},
  851. {Topic: "t1", Partition: 1},
  852. },
  853. },
  854. }
  855. for _, tt := range tests {
  856. t.Run(tt.name, func(t *testing.T) {
  857. if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) {
  858. t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want)
  859. }
  860. })
  861. }
  862. }
  863. func Test_assignPartition(t *testing.T) {
  864. type args struct {
  865. partition topicPartitionAssignment
  866. sortedCurrentSubscriptions []string
  867. currentAssignment map[string][]topicPartitionAssignment
  868. consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
  869. currentPartitionConsumer map[topicPartitionAssignment]string
  870. }
  871. tests := []struct {
  872. name string
  873. args args
  874. want []string
  875. wantCurrentAssignment map[string][]topicPartitionAssignment
  876. wantCurrentPartitionConsumer map[topicPartitionAssignment]string
  877. }{
  878. {
  879. name: "Base",
  880. args: args{
  881. partition: topicPartitionAssignment{Topic: "t1", Partition: 2},
  882. sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
  883. currentAssignment: map[string][]topicPartitionAssignment{
  884. "c1": {
  885. {Topic: "t1", Partition: 0},
  886. },
  887. "c2": {
  888. {Topic: "t1", Partition: 1},
  889. },
  890. "c3": {},
  891. },
  892. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  893. "c1": {
  894. {Topic: "t1", Partition: 0},
  895. },
  896. "c2": {
  897. {Topic: "t1", Partition: 1},
  898. },
  899. "c3": {
  900. {Topic: "t1", Partition: 2},
  901. },
  902. },
  903. currentPartitionConsumer: map[topicPartitionAssignment]string{
  904. {Topic: "t1", Partition: 0}: "c1",
  905. {Topic: "t1", Partition: 1}: "c2",
  906. },
  907. },
  908. want: []string{"c1", "c2", "c3"},
  909. wantCurrentAssignment: map[string][]topicPartitionAssignment{
  910. "c1": {
  911. {Topic: "t1", Partition: 0},
  912. },
  913. "c2": {
  914. {Topic: "t1", Partition: 1},
  915. },
  916. "c3": {
  917. {Topic: "t1", Partition: 2},
  918. },
  919. },
  920. wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
  921. {Topic: "t1", Partition: 0}: "c1",
  922. {Topic: "t1", Partition: 1}: "c2",
  923. {Topic: "t1", Partition: 2}: "c3",
  924. },
  925. },
  926. {
  927. name: "Unassignable Partition",
  928. args: args{
  929. partition: topicPartitionAssignment{Topic: "t1", Partition: 3},
  930. sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
  931. currentAssignment: map[string][]topicPartitionAssignment{
  932. "c1": {
  933. {Topic: "t1", Partition: 0},
  934. },
  935. "c2": {
  936. {Topic: "t1", Partition: 1},
  937. },
  938. "c3": {},
  939. },
  940. consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
  941. "c1": {
  942. {Topic: "t1", Partition: 0},
  943. },
  944. "c2": {
  945. {Topic: "t1", Partition: 1},
  946. },
  947. "c3": {
  948. {Topic: "t1", Partition: 2},
  949. },
  950. },
  951. currentPartitionConsumer: map[topicPartitionAssignment]string{
  952. {Topic: "t1", Partition: 0}: "c1",
  953. {Topic: "t1", Partition: 1}: "c2",
  954. },
  955. },
  956. want: []string{"c3", "c1", "c2"},
  957. wantCurrentAssignment: map[string][]topicPartitionAssignment{
  958. "c1": {
  959. {Topic: "t1", Partition: 0},
  960. },
  961. "c2": {
  962. {Topic: "t1", Partition: 1},
  963. },
  964. "c3": {},
  965. },
  966. wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
  967. {Topic: "t1", Partition: 0}: "c1",
  968. {Topic: "t1", Partition: 1}: "c2",
  969. },
  970. },
  971. }
  972. for _, tt := range tests {
  973. t.Run(tt.name, func(t *testing.T) {
  974. if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) {
  975. t.Errorf("assignPartition() = %v, want %v", got, tt.want)
  976. }
  977. if !reflect.DeepEqual(tt.args.currentAssignment, tt.wantCurrentAssignment) {
  978. t.Errorf("assignPartition() currentAssignment = %v, want %v", tt.args.currentAssignment, tt.wantCurrentAssignment)
  979. }
  980. if !reflect.DeepEqual(tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer) {
  981. t.Errorf("assignPartition() currentPartitionConsumer = %v, want %v", tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer)
  982. }
  983. })
  984. }
  985. }
  986. func Test_stickyBalanceStrategy_Plan(t *testing.T) {
  987. type args struct {
  988. members map[string]ConsumerGroupMemberMetadata
  989. topics map[string][]int32
  990. }
  991. tests := []struct {
  992. name string
  993. s *stickyBalanceStrategy
  994. args args
  995. }{
  996. {
  997. name: "One consumer with no topics",
  998. args: args{
  999. members: map[string]ConsumerGroupMemberMetadata{
  1000. "consumer": {},
  1001. },
  1002. topics: make(map[string][]int32),
  1003. },
  1004. },
  1005. {
  1006. name: "One consumer with non-existent topic",
  1007. args: args{
  1008. members: map[string]ConsumerGroupMemberMetadata{
  1009. "consumer": {
  1010. Topics: []string{"topic"},
  1011. },
  1012. },
  1013. topics: map[string][]int32{
  1014. "topic": make([]int32, 0),
  1015. },
  1016. },
  1017. },
  1018. {
  1019. name: "One consumer with one topic",
  1020. args: args{
  1021. members: map[string]ConsumerGroupMemberMetadata{
  1022. "consumer": {
  1023. Topics: []string{"topic"},
  1024. },
  1025. },
  1026. topics: map[string][]int32{
  1027. "topic": {0, 1, 2},
  1028. },
  1029. },
  1030. },
  1031. {
  1032. name: "Only assigns partitions from subscribed topics",
  1033. args: args{
  1034. members: map[string]ConsumerGroupMemberMetadata{
  1035. "consumer": {
  1036. Topics: []string{"topic"},
  1037. },
  1038. },
  1039. topics: map[string][]int32{
  1040. "topic": {0, 1, 2},
  1041. "other": {0, 1, 2},
  1042. },
  1043. },
  1044. },
  1045. {
  1046. name: "One consumer with multiple topics",
  1047. args: args{
  1048. members: map[string]ConsumerGroupMemberMetadata{
  1049. "consumer": {
  1050. Topics: []string{"topic1", "topic2"},
  1051. },
  1052. },
  1053. topics: map[string][]int32{
  1054. "topic1": {0},
  1055. "topic2": {0, 1},
  1056. },
  1057. },
  1058. },
  1059. {
  1060. name: "Two consumers with one topic and one partition",
  1061. args: args{
  1062. members: map[string]ConsumerGroupMemberMetadata{
  1063. "consumer1": {
  1064. Topics: []string{"topic"},
  1065. },
  1066. "consumer2": {
  1067. Topics: []string{"topic"},
  1068. },
  1069. },
  1070. topics: map[string][]int32{
  1071. "topic": {0},
  1072. },
  1073. },
  1074. },
  1075. {
  1076. name: "Two consumers with one topic and two partitions",
  1077. args: args{
  1078. members: map[string]ConsumerGroupMemberMetadata{
  1079. "consumer1": {
  1080. Topics: []string{"topic"},
  1081. },
  1082. "consumer2": {
  1083. Topics: []string{"topic"},
  1084. },
  1085. },
  1086. topics: map[string][]int32{
  1087. "topic": {0, 1},
  1088. },
  1089. },
  1090. },
  1091. {
  1092. name: "Multiple consumers with mixed topic subscriptions",
  1093. args: args{
  1094. members: map[string]ConsumerGroupMemberMetadata{
  1095. "consumer1": {
  1096. Topics: []string{"topic1"},
  1097. },
  1098. "consumer2": {
  1099. Topics: []string{"topic1", "topic2"},
  1100. },
  1101. "consumer3": {
  1102. Topics: []string{"topic1"},
  1103. },
  1104. },
  1105. topics: map[string][]int32{
  1106. "topic1": {0, 1, 2},
  1107. "topic2": {0, 1},
  1108. },
  1109. },
  1110. },
  1111. {
  1112. name: "Two consumers with two topics and six partitions",
  1113. args: args{
  1114. members: map[string]ConsumerGroupMemberMetadata{
  1115. "consumer1": {
  1116. Topics: []string{"topic1", "topic2"},
  1117. },
  1118. "consumer2": {
  1119. Topics: []string{"topic1", "topic2"},
  1120. },
  1121. },
  1122. topics: map[string][]int32{
  1123. "topic1": {0, 1, 2},
  1124. "topic2": {0, 1, 2},
  1125. },
  1126. },
  1127. },
  1128. {
  1129. name: "Three consumers (two old, one new) with one topic and twelve partitions",
  1130. args: args{
  1131. members: map[string]ConsumerGroupMemberMetadata{
  1132. "consumer1": {
  1133. Topics: []string{"topic1"},
  1134. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2}}, 1),
  1135. },
  1136. "consumer2": {
  1137. Topics: []string{"topic1"},
  1138. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 6}}, 1),
  1139. },
  1140. "consumer3": {
  1141. Topics: []string{"topic1"},
  1142. },
  1143. },
  1144. topics: map[string][]int32{
  1145. "topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
  1146. },
  1147. },
  1148. },
  1149. {
  1150. name: "Three consumers (two old, one new) with one topic and 13 partitions",
  1151. args: args{
  1152. members: map[string]ConsumerGroupMemberMetadata{
  1153. "consumer1": {
  1154. Topics: []string{"topic1"},
  1155. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2, 6}}, 1),
  1156. },
  1157. "consumer2": {
  1158. Topics: []string{"topic1"},
  1159. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 12}}, 1),
  1160. },
  1161. "consumer3": {
  1162. Topics: []string{"topic1"},
  1163. },
  1164. },
  1165. topics: map[string][]int32{
  1166. "topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
  1167. },
  1168. },
  1169. },
  1170. {
  1171. name: "One consumer that is no longer subscribed to a topic that it had previously been consuming from",
  1172. args: args{
  1173. members: map[string]ConsumerGroupMemberMetadata{
  1174. "consumer1": {
  1175. Topics: []string{"topic2"},
  1176. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
  1177. },
  1178. },
  1179. topics: map[string][]int32{
  1180. "topic1": {0},
  1181. "topic2": {0},
  1182. },
  1183. },
  1184. },
  1185. {
  1186. name: "Two consumers where one is no longer interested in consuming from a topic that it had been consuming from",
  1187. args: args{
  1188. members: map[string]ConsumerGroupMemberMetadata{
  1189. "consumer1": {
  1190. Topics: []string{"topic2"},
  1191. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
  1192. },
  1193. "consumer2": {
  1194. Topics: []string{"topic1", "topic2"},
  1195. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1}}, 1),
  1196. },
  1197. },
  1198. topics: map[string][]int32{
  1199. "topic1": {0, 1},
  1200. "topic2": {0, 1},
  1201. },
  1202. },
  1203. },
  1204. }
  1205. for _, tt := range tests {
  1206. t.Run(tt.name, func(t *testing.T) {
  1207. s := &stickyBalanceStrategy{}
  1208. plan, err := s.Plan(tt.args.members, tt.args.topics)
  1209. verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err)
  1210. verifyFullyBalanced(t, plan)
  1211. })
  1212. }
  1213. }
  1214. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) {
  1215. s := &stickyBalanceStrategy{}
  1216. // PLAN 1
  1217. members := map[string]ConsumerGroupMemberMetadata{
  1218. "consumer1": {
  1219. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1220. },
  1221. "consumer2": {
  1222. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1223. },
  1224. "consumer3": {
  1225. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1226. },
  1227. }
  1228. topics := map[string][]int32{
  1229. "topic1": {0, 1},
  1230. "topic2": {0, 1},
  1231. "topic3": {0, 1},
  1232. "topic4": {0, 1},
  1233. }
  1234. plan1, err := s.Plan(members, topics)
  1235. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1236. verifyFullyBalanced(t, plan1)
  1237. // PLAN 2
  1238. delete(members, "consumer1")
  1239. members["consumer2"] = ConsumerGroupMemberMetadata{
  1240. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1241. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1242. }
  1243. members["consumer3"] = ConsumerGroupMemberMetadata{
  1244. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1245. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1246. }
  1247. plan2, err := s.Plan(members, topics)
  1248. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1249. verifyFullyBalanced(t, plan2)
  1250. }
  1251. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) {
  1252. s := &stickyBalanceStrategy{}
  1253. // PLAN 1
  1254. members := map[string]ConsumerGroupMemberMetadata{
  1255. "consumer1": {
  1256. Topics: []string{"topic1"},
  1257. },
  1258. "consumer2": {
  1259. Topics: []string{"topic1", "topic2"},
  1260. },
  1261. "consumer3": {
  1262. Topics: []string{"topic1", "topic2", "topic3"},
  1263. },
  1264. }
  1265. topics := map[string][]int32{
  1266. "topic1": {0},
  1267. "topic2": {0, 1},
  1268. "topic3": {0, 1, 2},
  1269. }
  1270. plan1, err := s.Plan(members, topics)
  1271. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1272. if len(plan1["consumer1"]["topic1"]) != 1 || len(plan1["consumer2"]["topic2"]) != 2 || len(plan1["consumer3"]["topic3"]) != 3 {
  1273. t.Error("Incorrect distribution of topic partition assignments")
  1274. }
  1275. // PLAN 2
  1276. delete(members, "consumer1")
  1277. members["consumer2"] = ConsumerGroupMemberMetadata{
  1278. Topics: members["consumer2"].Topics,
  1279. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1280. }
  1281. members["consumer3"] = ConsumerGroupMemberMetadata{
  1282. Topics: members["consumer3"].Topics,
  1283. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1284. }
  1285. plan2, err := s.Plan(members, topics)
  1286. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1287. verifyFullyBalanced(t, plan2)
  1288. if len(plan2["consumer2"]["topic1"]) != 1 || len(plan2["consumer2"]["topic2"]) != 2 || len(plan2["consumer3"]["topic3"]) != 3 {
  1289. t.Error("Incorrect distribution of topic partition assignments")
  1290. }
  1291. }
  1292. func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) {
  1293. s := &stickyBalanceStrategy{}
  1294. topicNames := []string{"topic1", "topic2"}
  1295. // PLAN 1
  1296. members := map[string]ConsumerGroupMemberMetadata{
  1297. "consumer1": {
  1298. Topics: topicNames,
  1299. },
  1300. "consumer2": {
  1301. Topics: topicNames,
  1302. },
  1303. }
  1304. topics := map[string][]int32{
  1305. "topic1": {0, 1},
  1306. "topic2": {0, 1},
  1307. }
  1308. plan1, err := s.Plan(members, topics)
  1309. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1310. // PLAN 2
  1311. members["consumer1"] = ConsumerGroupMemberMetadata{
  1312. Topics: topicNames,
  1313. }
  1314. members["consumer2"] = ConsumerGroupMemberMetadata{
  1315. Topics: topicNames,
  1316. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1317. }
  1318. members["consumer3"] = ConsumerGroupMemberMetadata{
  1319. Topics: topicNames,
  1320. UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
  1321. }
  1322. plan2, err := s.Plan(members, topics)
  1323. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1324. verifyFullyBalanced(t, plan2)
  1325. }
  1326. func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) {
  1327. s := &stickyBalanceStrategy{}
  1328. // PLAN 1
  1329. members := map[string]ConsumerGroupMemberMetadata{
  1330. "consumer1": {
  1331. Topics: []string{"topic"},
  1332. },
  1333. }
  1334. topics := map[string][]int32{
  1335. "topic": {0, 1, 2},
  1336. }
  1337. plan1, err := s.Plan(members, topics)
  1338. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1339. // PLAN 2
  1340. members["consumer1"] = ConsumerGroupMemberMetadata{
  1341. Topics: []string{"topic"},
  1342. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1343. }
  1344. members["consumer2"] = ConsumerGroupMemberMetadata{
  1345. Topics: []string{"topic"},
  1346. }
  1347. plan2, err := s.Plan(members, topics)
  1348. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1349. // PLAN 3
  1350. delete(members, "consumer1")
  1351. members["consumer2"] = ConsumerGroupMemberMetadata{
  1352. Topics: []string{"topic"},
  1353. UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
  1354. }
  1355. plan3, err := s.Plan(members, topics)
  1356. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1357. }
  1358. func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) {
  1359. s := &stickyBalanceStrategy{}
  1360. // PLAN 1
  1361. members := map[string]ConsumerGroupMemberMetadata{
  1362. "consumer1": {
  1363. Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
  1364. },
  1365. "consumer2": {
  1366. Topics: []string{"topic1", "topic3", "topic5"},
  1367. },
  1368. "consumer3": {
  1369. Topics: []string{"topic1", "topic3", "topic5"},
  1370. },
  1371. "consumer4": {
  1372. Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
  1373. },
  1374. }
  1375. topics := make(map[string][]int32, 5)
  1376. for i := 1; i <= 5; i++ {
  1377. partitions := make([]int32, i%2+1)
  1378. for j := 0; j < i%2+1; j++ {
  1379. partitions[j] = int32(j)
  1380. }
  1381. topics[fmt.Sprintf("topic%d", i)] = partitions
  1382. }
  1383. plan, err := s.Plan(members, topics)
  1384. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1385. }
  1386. func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) {
  1387. s := &stickyBalanceStrategy{}
  1388. // PLAN 1
  1389. members := map[string]ConsumerGroupMemberMetadata{
  1390. "consumer1": {
  1391. Topics: []string{"topic1"},
  1392. },
  1393. "consumer2": {
  1394. Topics: []string{"topic1"},
  1395. },
  1396. }
  1397. topics := map[string][]int32{
  1398. "topic1": {0, 1, 2},
  1399. }
  1400. plan1, err := s.Plan(members, topics)
  1401. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1402. verifyFullyBalanced(t, plan1)
  1403. // PLAN 2
  1404. members["consumer1"] = ConsumerGroupMemberMetadata{
  1405. Topics: []string{"topic1", "topic2"},
  1406. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1407. }
  1408. members["consumer2"] = ConsumerGroupMemberMetadata{
  1409. Topics: []string{"topic1", "topic2"},
  1410. UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
  1411. }
  1412. topics["topic2"] = []int32{0, 1, 2}
  1413. plan2, err := s.Plan(members, topics)
  1414. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1415. verifyFullyBalanced(t, plan2)
  1416. // PLAN 3
  1417. members["consumer1"] = ConsumerGroupMemberMetadata{
  1418. Topics: []string{"topic1", "topic2"},
  1419. UserData: encodeSubscriberPlan(t, plan2["consumer1"]),
  1420. }
  1421. members["consumer2"] = ConsumerGroupMemberMetadata{
  1422. Topics: []string{"topic1", "topic2"},
  1423. UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
  1424. }
  1425. delete(topics, "topic1")
  1426. plan3, err := s.Plan(members, topics)
  1427. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1428. verifyFullyBalanced(t, plan3)
  1429. }
  1430. func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) {
  1431. s := &stickyBalanceStrategy{}
  1432. // PLAN 1
  1433. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1434. for i := 0; i < 20; i++ {
  1435. topics := make([]string, 20)
  1436. for j := 0; j < 20; j++ {
  1437. topics[j] = fmt.Sprintf("topic%d", j)
  1438. }
  1439. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1440. }
  1441. topics := make(map[string][]int32, 20)
  1442. for i := 0; i < 20; i++ {
  1443. partitions := make([]int32, 20)
  1444. for j := 0; j < 20; j++ {
  1445. partitions[j] = int32(j)
  1446. }
  1447. topics[fmt.Sprintf("topic%d", i)] = partitions
  1448. }
  1449. plan1, err := s.Plan(members, topics)
  1450. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1451. for i := 0; i < 20; i++ {
  1452. topics := make([]string, 20)
  1453. for j := 0; j < 20; j++ {
  1454. topics[j] = fmt.Sprintf("topic%d", j)
  1455. }
  1456. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1457. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1458. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1459. }
  1460. }
  1461. delete(members, "consumer10")
  1462. plan2, err := s.Plan(members, topics)
  1463. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1464. }
  1465. func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) {
  1466. s := &stickyBalanceStrategy{}
  1467. // PLAN 1
  1468. members := make(map[string]ConsumerGroupMemberMetadata)
  1469. for i := 0; i < 10; i++ {
  1470. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1471. }
  1472. partitions := make([]int32, 20)
  1473. for j := 0; j < 20; j++ {
  1474. partitions[j] = int32(j)
  1475. }
  1476. topics := map[string][]int32{"topic1": partitions}
  1477. plan1, err := s.Plan(members, topics)
  1478. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1479. // add a new consumer
  1480. members["consumer10"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1481. plan2, err := s.Plan(members, topics)
  1482. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1483. }
  1484. func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) {
  1485. s := &stickyBalanceStrategy{}
  1486. // PLAN 1
  1487. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1488. for i := 0; i < 9; i++ {
  1489. topics := make([]string, 15)
  1490. for j := 0; j < 15; j++ {
  1491. topics[j] = fmt.Sprintf("topic%d", j)
  1492. }
  1493. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1494. }
  1495. topics := make(map[string][]int32, 15)
  1496. for i := 0; i < 15; i++ {
  1497. partitions := make([]int32, i)
  1498. for j := 0; j < i; j++ {
  1499. partitions[j] = int32(j)
  1500. }
  1501. topics[fmt.Sprintf("topic%d", i)] = partitions
  1502. }
  1503. plan1, err := s.Plan(members, topics)
  1504. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1505. // PLAN 2
  1506. for i := 0; i < 9; i++ {
  1507. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1508. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1509. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1510. }
  1511. }
  1512. delete(members, "consumer5")
  1513. plan2, err := s.Plan(members, topics)
  1514. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1515. }
  1516. func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) {
  1517. s := &stickyBalanceStrategy{}
  1518. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1519. // PLAN 1
  1520. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1521. for i := 0; i < 200; i++ {
  1522. topics := make([]string, 200)
  1523. for j := 0; j < 200; j++ {
  1524. topics[j] = fmt.Sprintf("topic%d", j)
  1525. }
  1526. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1527. }
  1528. topics := make(map[string][]int32, 40)
  1529. for i := 0; i < 40; i++ {
  1530. partitionCount := r.Intn(20)
  1531. partitions := make([]int32, partitionCount)
  1532. for j := 0; j < partitionCount; j++ {
  1533. partitions[j] = int32(j)
  1534. }
  1535. topics[fmt.Sprintf("topic%d", i)] = partitions
  1536. }
  1537. plan1, err := s.Plan(members, topics)
  1538. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1539. for i := 0; i < 200; i++ {
  1540. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1541. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1542. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1543. }
  1544. }
  1545. for i := 0; i < 50; i++ {
  1546. delete(members, fmt.Sprintf("consumer%d", i))
  1547. }
  1548. plan2, err := s.Plan(members, topics)
  1549. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1550. }
  1551. func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) {
  1552. s := &stickyBalanceStrategy{}
  1553. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1554. for i := 0; i < 3; i++ {
  1555. topics := make([]string, 0)
  1556. for j := i; j <= 3*i-2; j++ {
  1557. topics = append(topics, fmt.Sprintf("topic%d", j))
  1558. }
  1559. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1560. }
  1561. topics := make(map[string][]int32, 5)
  1562. for i := 1; i < 5; i++ {
  1563. topics[fmt.Sprintf("topic%d", i)] = []int32{0}
  1564. }
  1565. plan1, err := s.Plan(members, topics)
  1566. if err != nil {
  1567. t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
  1568. return
  1569. }
  1570. verifyValidityAndBalance(t, members, plan1)
  1571. members["consumer0"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1572. plan2, err := s.Plan(members, topics)
  1573. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1574. }
  1575. func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) {
  1576. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1577. minNumConsumers := 20
  1578. maxNumConsumers := 40
  1579. minNumTopics := 10
  1580. maxNumTopics := 20
  1581. for round := 0; round < 100; round++ {
  1582. numTopics := minNumTopics + r.Intn(maxNumTopics-minNumTopics)
  1583. topics := make([]string, numTopics)
  1584. partitionsPerTopic := make(map[string][]int32, numTopics)
  1585. for i := 0; i < numTopics; i++ {
  1586. topicName := fmt.Sprintf("topic%d", i)
  1587. topics[i] = topicName
  1588. partitions := make([]int32, maxNumTopics)
  1589. for j := 0; j < maxNumTopics; j++ {
  1590. partitions[j] = int32(j)
  1591. }
  1592. partitionsPerTopic[topicName] = partitions
  1593. }
  1594. numConsumers := minNumConsumers + r.Intn(maxNumConsumers-minNumConsumers)
  1595. members := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
  1596. for i := 0; i < numConsumers; i++ {
  1597. sub := getRandomSublist(r, topics)
  1598. sort.Strings(sub)
  1599. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: sub}
  1600. }
  1601. s := &stickyBalanceStrategy{}
  1602. plan, err := s.Plan(members, partitionsPerTopic)
  1603. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1604. // PLAN 2
  1605. membersPlan2 := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
  1606. for i := 0; i < numConsumers; i++ {
  1607. sub := getRandomSublist(r, topics)
  1608. sort.Strings(sub)
  1609. membersPlan2[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1610. Topics: sub,
  1611. UserData: encodeSubscriberPlan(t, plan[fmt.Sprintf("consumer%d", i)]),
  1612. }
  1613. }
  1614. plan2, err := s.Plan(membersPlan2, partitionsPerTopic)
  1615. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1616. }
  1617. }
  1618. func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) {
  1619. s := &stickyBalanceStrategy{}
  1620. topics := make(map[string][]int32, 6)
  1621. for i := 1; i <= 6; i++ {
  1622. topics[fmt.Sprintf("topic%d", i)] = []int32{0}
  1623. }
  1624. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1625. members["consumer1"] = ConsumerGroupMemberMetadata{
  1626. Topics: []string{"topic1", "topic2"},
  1627. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic1": {0}}),
  1628. }
  1629. members["consumer2"] = ConsumerGroupMemberMetadata{
  1630. Topics: []string{"topic1", "topic2", "topic3", "topic4"},
  1631. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic2": {0}, "topic3": {0}}),
  1632. }
  1633. members["consumer3"] = ConsumerGroupMemberMetadata{
  1634. Topics: []string{"topic2", "topic3", "topic4", "topic5", "topic6"},
  1635. UserData: encodeSubscriberPlan(t, map[string][]int32{"topic4": {0}, "topic5": {0}, "topic6": {0}}),
  1636. }
  1637. plan, err := s.Plan(members, topics)
  1638. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1639. }
  1640. func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) {
  1641. s := &stickyBalanceStrategy{}
  1642. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1643. members := map[string]ConsumerGroupMemberMetadata{
  1644. "consumer1": {Topics: []string{"topic1"}},
  1645. "consumer2": {Topics: []string{"topic1"}},
  1646. "consumer3": {Topics: []string{"topic1"}},
  1647. "consumer4": {Topics: []string{"topic1"}},
  1648. }
  1649. plan1, err := s.Plan(members, topics)
  1650. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1651. // PLAN 2
  1652. // remove the potential group leader
  1653. delete(members, "consumer1")
  1654. for i := 2; i <= 4; i++ {
  1655. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1656. Topics: []string{"topic1"},
  1657. UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
  1658. }
  1659. }
  1660. plan2, err := s.Plan(members, topics)
  1661. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1662. }
  1663. func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) {
  1664. s := &stickyBalanceStrategy{}
  1665. topics := make(map[string][]int32, 2)
  1666. topics["topic1"] = []int32{0}
  1667. topics["topic3"] = make([]int32, 100)
  1668. for i := 0; i < 100; i++ {
  1669. topics["topic3"][i] = int32(i)
  1670. }
  1671. members := map[string]ConsumerGroupMemberMetadata{
  1672. "consumer1": {Topics: []string{"topic1", "topic2", "topic3"}},
  1673. }
  1674. plan, err := s.Plan(members, topics)
  1675. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1676. verifyFullyBalanced(t, plan)
  1677. if (len(plan["consumer1"]["topic1"]) + len(plan["consumer1"]["topic3"])) != 101 {
  1678. t.Error("Incorrect number of partitions assigned")
  1679. return
  1680. }
  1681. }
  1682. func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) {
  1683. s := &stickyBalanceStrategy{}
  1684. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1685. members := map[string]ConsumerGroupMemberMetadata{
  1686. "consumer1": {Topics: []string{"topic1"}},
  1687. }
  1688. plan1, err := s.Plan(members, topics)
  1689. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1690. // PLAN 2
  1691. members["consumer1"] = ConsumerGroupMemberMetadata{
  1692. Topics: members["consumer1"].Topics,
  1693. UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
  1694. }
  1695. plan2, err := s.Plan(members, map[string][]int32{})
  1696. if len(plan2) != 1 {
  1697. t.Error("Incorrect number of consumers")
  1698. return
  1699. }
  1700. if len(plan2["consumer1"]) != 0 {
  1701. t.Error("Incorrect number of consumer topic assignments")
  1702. return
  1703. }
  1704. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1705. }
  1706. func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) {
  1707. s := &stickyBalanceStrategy{}
  1708. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1709. members := map[string]ConsumerGroupMemberMetadata{
  1710. "consumer1": {Topics: []string{"topic1"}},
  1711. "consumer2": {Topics: []string{"topic1"}},
  1712. "consumer3": {Topics: []string{"topic1"}},
  1713. }
  1714. plan1, err := s.Plan(members, topics)
  1715. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1716. verifyFullyBalanced(t, plan1)
  1717. // PLAN 2
  1718. members["consumer1"] = ConsumerGroupMemberMetadata{
  1719. Topics: []string{"topic1"},
  1720. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
  1721. }
  1722. members["consumer2"] = ConsumerGroupMemberMetadata{
  1723. Topics: []string{"topic1"},
  1724. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
  1725. }
  1726. delete(members, "consumer3")
  1727. plan2, err := s.Plan(members, topics)
  1728. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1729. verifyFullyBalanced(t, plan2)
  1730. if len(intersection(plan1["consumer1"]["topic1"], plan2["consumer1"]["topic1"])) != 2 {
  1731. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1732. }
  1733. if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
  1734. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1735. }
  1736. // PLAN 3
  1737. delete(members, "consumer1")
  1738. members["consumer2"] = ConsumerGroupMemberMetadata{
  1739. Topics: []string{"topic1"},
  1740. UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
  1741. }
  1742. members["consumer3"] = ConsumerGroupMemberMetadata{
  1743. Topics: []string{"topic1"},
  1744. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
  1745. }
  1746. plan3, err := s.Plan(members, topics)
  1747. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1748. verifyFullyBalanced(t, plan3)
  1749. }
  1750. func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) {
  1751. s := &stickyBalanceStrategy{}
  1752. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1753. members := map[string]ConsumerGroupMemberMetadata{
  1754. "consumer1": {Topics: []string{"topic1"}},
  1755. "consumer2": {Topics: []string{"topic1"}},
  1756. "consumer3": {Topics: []string{"topic1"}},
  1757. }
  1758. plan1, err := s.Plan(members, topics)
  1759. verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
  1760. verifyFullyBalanced(t, plan1)
  1761. // PLAN 2
  1762. delete(members, "consumer1")
  1763. members["consumer2"] = ConsumerGroupMemberMetadata{
  1764. Topics: []string{"topic1"},
  1765. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
  1766. }
  1767. delete(members, "consumer3")
  1768. plan2, err := s.Plan(members, topics)
  1769. verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
  1770. verifyFullyBalanced(t, plan2)
  1771. if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
  1772. t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
  1773. }
  1774. // PLAN 3
  1775. members["consumer1"] = ConsumerGroupMemberMetadata{
  1776. Topics: []string{"topic1"},
  1777. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
  1778. }
  1779. members["consumer2"] = ConsumerGroupMemberMetadata{
  1780. Topics: []string{"topic1"},
  1781. UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
  1782. }
  1783. members["consumer3"] = ConsumerGroupMemberMetadata{
  1784. Topics: []string{"topic1"},
  1785. UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
  1786. }
  1787. plan3, err := s.Plan(members, topics)
  1788. verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
  1789. verifyFullyBalanced(t, plan3)
  1790. }
  1791. func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
  1792. s := &stickyBalanceStrategy{}
  1793. topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
  1794. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1795. members["consumer1"] = ConsumerGroupMemberMetadata{
  1796. Topics: []string{"topic1"},
  1797. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1, 4}}, 1),
  1798. }
  1799. members["consumer2"] = ConsumerGroupMemberMetadata{
  1800. Topics: []string{"topic1"},
  1801. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2, 3}}, 1),
  1802. }
  1803. members["consumer3"] = ConsumerGroupMemberMetadata{
  1804. Topics: []string{"topic1"},
  1805. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {3, 4, 5}}, 2),
  1806. }
  1807. plan, err := s.Plan(members, topics)
  1808. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1809. verifyFullyBalanced(t, plan)
  1810. }
  1811. func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) {
  1812. s := &stickyBalanceStrategy{}
  1813. topics := map[string][]int32{"topic1": {0, 1, 2}}
  1814. members := make(map[string]ConsumerGroupMemberMetadata, 3)
  1815. members["consumer1"] = ConsumerGroupMemberMetadata{
  1816. Topics: []string{"topic1"},
  1817. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2}}, 1),
  1818. }
  1819. members["consumer2"] = ConsumerGroupMemberMetadata{
  1820. Topics: []string{"topic1"},
  1821. UserData: encodeSubscriberPlanWithOldSchema(t, map[string][]int32{"topic1": {1}}),
  1822. }
  1823. members["consumer3"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
  1824. plan, err := s.Plan(members, topics)
  1825. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1826. verifyFullyBalanced(t, plan)
  1827. }
  1828. func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) {
  1829. s := &stickyBalanceStrategy{}
  1830. topics := map[string][]int32{"topic1": {0, 1}}
  1831. members := make(map[string]ConsumerGroupMemberMetadata, 2)
  1832. members["consumer1"] = ConsumerGroupMemberMetadata{
  1833. Topics: []string{"topic1"},
  1834. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
  1835. }
  1836. members["consumer2"] = ConsumerGroupMemberMetadata{
  1837. Topics: []string{"topic1"},
  1838. UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
  1839. }
  1840. plan, err := s.Plan(members, topics)
  1841. verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
  1842. verifyFullyBalanced(t, plan)
  1843. }
  1844. func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
  1845. s := &stickyBalanceStrategy{}
  1846. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1847. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1848. for i := 0; i < 200; i++ {
  1849. topics := make([]string, 200)
  1850. for j := 0; j < 200; j++ {
  1851. topics[j] = fmt.Sprintf("topic%d", j)
  1852. }
  1853. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1854. }
  1855. topics := make(map[string][]int32, 40)
  1856. for i := 0; i < 40; i++ {
  1857. partitionCount := r.Intn(20)
  1858. partitions := make([]int32, partitionCount)
  1859. for j := 0; j < partitionCount; j++ {
  1860. partitions[j] = int32(j)
  1861. }
  1862. topics[fmt.Sprintf("topic%d", i)] = partitions
  1863. }
  1864. b.ResetTimer()
  1865. for n := 0; n < b.N; n++ {
  1866. if _, err := s.Plan(members, topics); err != nil {
  1867. b.Errorf("Error building plan in benchmark: %v", err)
  1868. }
  1869. }
  1870. }
  1871. func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssignments(b *testing.B) {
  1872. s := &stickyBalanceStrategy{}
  1873. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  1874. members := make(map[string]ConsumerGroupMemberMetadata, 20)
  1875. for i := 0; i < 200; i++ {
  1876. topics := make([]string, 200)
  1877. for j := 0; j < 200; j++ {
  1878. topics[j] = fmt.Sprintf("topic%d", j)
  1879. }
  1880. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
  1881. }
  1882. topics := make(map[string][]int32, 40)
  1883. for i := 0; i < 40; i++ {
  1884. partitionCount := r.Intn(20)
  1885. partitions := make([]int32, partitionCount)
  1886. for j := 0; j < partitionCount; j++ {
  1887. partitions[j] = int32(j)
  1888. }
  1889. topics[fmt.Sprintf("topic%d", i)] = partitions
  1890. }
  1891. plan, _ := s.Plan(members, topics)
  1892. for i := 0; i < 200; i++ {
  1893. members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
  1894. Topics: members[fmt.Sprintf("consumer%d", i)].Topics,
  1895. UserData: encodeSubscriberPlanWithGenerationForBenchmark(b, plan[fmt.Sprintf("consumer%d", i)], 1),
  1896. }
  1897. }
  1898. for i := 0; i < 1; i++ {
  1899. delete(members, fmt.Sprintf("consumer%d", i))
  1900. }
  1901. b.ResetTimer()
  1902. for n := 0; n < b.N; n++ {
  1903. if _, err := s.Plan(members, topics); err != nil {
  1904. b.Errorf("Error building plan in benchmark: %v", err)
  1905. }
  1906. }
  1907. }
  1908. func verifyPlanIsBalancedAndSticky(t *testing.T, s *stickyBalanceStrategy, members map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan, err error) {
  1909. if err != nil {
  1910. t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
  1911. return
  1912. }
  1913. if !s.movements.isSticky() {
  1914. t.Error("stickyBalanceStrategy.Plan() not sticky")
  1915. return
  1916. }
  1917. verifyValidityAndBalance(t, members, plan)
  1918. }
  1919. func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan) {
  1920. size := len(consumers)
  1921. if size != len(plan) {
  1922. t.Errorf("Subscription size (%d) not equal to plan size (%d)", size, len(plan))
  1923. t.FailNow()
  1924. }
  1925. members := make([]string, size)
  1926. i := 0
  1927. for memberID := range consumers {
  1928. members[i] = memberID
  1929. i++
  1930. }
  1931. sort.Strings(members)
  1932. for i, memberID := range members {
  1933. for assignedTopic := range plan[memberID] {
  1934. found := false
  1935. for _, assignableTopic := range consumers[memberID].Topics {
  1936. if assignableTopic == assignableTopic {
  1937. found = true
  1938. break
  1939. }
  1940. }
  1941. if !found {
  1942. t.Errorf("Consumer %s had assigned topic %s that wasn't in the list of assignable topics", memberID, assignedTopic)
  1943. t.FailNow()
  1944. }
  1945. }
  1946. // skip last consumer
  1947. if i == len(members)-1 {
  1948. continue
  1949. }
  1950. consumerAssignments := make([]topicPartitionAssignment, 0)
  1951. for topic, partitions := range plan[memberID] {
  1952. for _, partition := range partitions {
  1953. consumerAssignments = append(consumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
  1954. }
  1955. }
  1956. for j := i + 1; j < size; j++ {
  1957. otherConsumer := members[j]
  1958. otherConsumerAssignments := make([]topicPartitionAssignment, 0)
  1959. for topic, partitions := range plan[otherConsumer] {
  1960. for _, partition := range partitions {
  1961. otherConsumerAssignments = append(otherConsumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
  1962. }
  1963. }
  1964. assignmentsIntersection := intersection(consumerAssignments, otherConsumerAssignments)
  1965. if len(assignmentsIntersection) > 0 {
  1966. t.Errorf("Consumers %s and %s have common partitions assigned to them: %v", memberID, otherConsumer, assignmentsIntersection)
  1967. t.FailNow()
  1968. }
  1969. if math.Abs(float64(len(consumerAssignments)-len(otherConsumerAssignments))) <= 1 {
  1970. continue
  1971. }
  1972. if len(consumerAssignments) > len(otherConsumerAssignments) {
  1973. for _, topic := range consumerAssignments {
  1974. if _, exists := plan[otherConsumer][topic.Topic]; exists {
  1975. t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", otherConsumer, memberID, memberID, len(consumerAssignments), otherConsumer, len(otherConsumerAssignments))
  1976. t.FailNow()
  1977. }
  1978. }
  1979. }
  1980. if len(otherConsumerAssignments) > len(consumerAssignments) {
  1981. for _, topic := range otherConsumerAssignments {
  1982. if _, exists := plan[memberID][topic.Topic]; exists {
  1983. t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", memberID, otherConsumer, otherConsumer, len(otherConsumerAssignments), memberID, len(consumerAssignments))
  1984. t.FailNow()
  1985. }
  1986. }
  1987. }
  1988. }
  1989. }
  1990. }
  1991. // Produces the intersection of two slices
  1992. // From https://github.com/juliangruber/go-intersect
  1993. func intersection(a interface{}, b interface{}) []interface{} {
  1994. set := make([]interface{}, 0)
  1995. hash := make(map[interface{}]bool)
  1996. av := reflect.ValueOf(a)
  1997. bv := reflect.ValueOf(b)
  1998. for i := 0; i < av.Len(); i++ {
  1999. el := av.Index(i).Interface()
  2000. hash[el] = true
  2001. }
  2002. for i := 0; i < bv.Len(); i++ {
  2003. el := bv.Index(i).Interface()
  2004. if _, found := hash[el]; found {
  2005. set = append(set, el)
  2006. }
  2007. }
  2008. return set
  2009. }
  2010. func encodeSubscriberPlan(t *testing.T, assignments map[string][]int32) []byte {
  2011. return encodeSubscriberPlanWithGeneration(t, assignments, defaultGeneration)
  2012. }
  2013. func encodeSubscriberPlanWithGeneration(t *testing.T, assignments map[string][]int32, generation int32) []byte {
  2014. userDataBytes, err := encode(&StickyAssignorUserDataV1{
  2015. Topics: assignments,
  2016. Generation: generation,
  2017. }, nil)
  2018. if err != nil {
  2019. t.Errorf("encodeSubscriberPlan error = %v", err)
  2020. t.FailNow()
  2021. }
  2022. return userDataBytes
  2023. }
  2024. func encodeSubscriberPlanWithGenerationForBenchmark(b *testing.B, assignments map[string][]int32, generation int32) []byte {
  2025. userDataBytes, err := encode(&StickyAssignorUserDataV1{
  2026. Topics: assignments,
  2027. Generation: generation,
  2028. }, nil)
  2029. if err != nil {
  2030. b.Errorf("encodeSubscriberPlan error = %v", err)
  2031. b.FailNow()
  2032. }
  2033. return userDataBytes
  2034. }
  2035. func encodeSubscriberPlanWithOldSchema(t *testing.T, assignments map[string][]int32) []byte {
  2036. userDataBytes, err := encode(&StickyAssignorUserDataV0{
  2037. Topics: assignments,
  2038. }, nil)
  2039. if err != nil {
  2040. t.Errorf("encodeSubscriberPlan error = %v", err)
  2041. t.FailNow()
  2042. }
  2043. return userDataBytes
  2044. }
  2045. // verify that the plan is fully balanced, assumes that all consumers can
  2046. // consume from the same set of topics
  2047. func verifyFullyBalanced(t *testing.T, plan BalanceStrategyPlan) {
  2048. min := math.MaxInt32
  2049. max := math.MinInt32
  2050. for _, topics := range plan {
  2051. assignedPartitionsCount := 0
  2052. for _, partitions := range topics {
  2053. assignedPartitionsCount += len(partitions)
  2054. }
  2055. if assignedPartitionsCount < min {
  2056. min = assignedPartitionsCount
  2057. }
  2058. if assignedPartitionsCount > max {
  2059. max = assignedPartitionsCount
  2060. }
  2061. }
  2062. if (max - min) > 1 {
  2063. t.Errorf("Plan partition assignment is not fully balanced: min=%d, max=%d", min, max)
  2064. }
  2065. }
  2066. func getRandomSublist(r *rand.Rand, s []string) []string {
  2067. howManyToRemove := r.Intn(len(s))
  2068. allEntriesMap := make(map[int]string)
  2069. for i, s := range s {
  2070. allEntriesMap[i] = s
  2071. }
  2072. for i := 0; i < howManyToRemove; i++ {
  2073. delete(allEntriesMap, r.Intn(len(allEntriesMap)))
  2074. }
  2075. subList := make([]string, len(allEntriesMap))
  2076. i := 0
  2077. for _, s := range allEntriesMap {
  2078. subList[i] = s
  2079. i++
  2080. }
  2081. return subList
  2082. }
  2083. func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) {
  2084. type args struct {
  2085. partition2AllPotentialConsumers map[topicPartitionAssignment][]string
  2086. }
  2087. tests := []struct {
  2088. name string
  2089. args args
  2090. want []topicPartitionAssignment
  2091. }{
  2092. {
  2093. name: "Single topic partition",
  2094. args: args{
  2095. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2096. {
  2097. Topic: "t1",
  2098. Partition: 0,
  2099. }: {"c1", "c2"},
  2100. },
  2101. },
  2102. want: []topicPartitionAssignment{
  2103. {
  2104. Topic: "t1",
  2105. Partition: 0,
  2106. },
  2107. },
  2108. },
  2109. {
  2110. name: "Multiple topic partitions with the same number of consumers but different topic names",
  2111. args: args{
  2112. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2113. {
  2114. Topic: "t1",
  2115. Partition: 0,
  2116. }: {"c1", "c2"},
  2117. {
  2118. Topic: "t2",
  2119. Partition: 0,
  2120. }: {"c1", "c2"},
  2121. },
  2122. },
  2123. want: []topicPartitionAssignment{
  2124. {
  2125. Topic: "t1",
  2126. Partition: 0,
  2127. },
  2128. {
  2129. Topic: "t2",
  2130. Partition: 0,
  2131. },
  2132. },
  2133. },
  2134. {
  2135. name: "Multiple topic partitions with the same number of consumers and topic names",
  2136. args: args{
  2137. partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
  2138. {
  2139. Topic: "t1",
  2140. Partition: 0,
  2141. }: {"c1", "c2"},
  2142. {
  2143. Topic: "t1",
  2144. Partition: 1,
  2145. }: {"c1", "c2"},
  2146. },
  2147. },
  2148. want: []topicPartitionAssignment{
  2149. {
  2150. Topic: "t1",
  2151. Partition: 0,
  2152. },
  2153. {
  2154. Topic: "t1",
  2155. Partition: 1,
  2156. },
  2157. },
  2158. },
  2159. }
  2160. for _, tt := range tests {
  2161. t.Run(tt.name, func(t *testing.T) {
  2162. if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
  2163. t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want)
  2164. }
  2165. })
  2166. }
  2167. }