|
@@ -373,8 +373,8 @@ func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
|
|
|
}
|
|
|
|
|
|
// Determine whether the current assignment plan is balanced.
|
|
|
-func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
|
|
|
- sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
|
|
|
+func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
|
|
|
+ sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
|
|
|
min := len(currentAssignment[sortedCurrentSubscriptions[0]])
|
|
|
max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
|
|
|
if min >= max-1 {
|
|
@@ -430,7 +430,7 @@ func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []to
|
|
|
// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
|
|
|
// until the full list is processed or a balance is achieved
|
|
|
for _, partition := range reassignablePartitions {
|
|
|
- if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
|
|
|
+ if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
|
|
|
break
|
|
|
}
|
|
|
|