Browse Source

PR feedback

Scott Kidder 5 years ago
parent
commit
e52742d7de
2 changed files with 133 additions and 165 deletions
  1. 47 69
      balance_strategy.go
  2. 86 96
      balance_strategy_test.go

+ 47 - 69
balance_strategy.go

@@ -444,7 +444,6 @@ func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []to
 					modified = true
 					break
 				}
-
 			}
 		}
 		if !modified {
@@ -550,23 +549,6 @@ func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssig
 	return assignments
 }
 
-func removeValueFromStringSlice(s []string, e string) []string {
-	for i, v := range s {
-		if v == e {
-			s = append(s[:i], s[i+1:]...)
-			break
-		}
-	}
-	return s
-}
-
-func removeIndexFromStringSlice(s []string, i int) []string {
-	if len(s) == 0 {
-		return s
-	}
-	return append(s[:i], s[i+1:]...)
-}
-
 func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
 	for i, assignment := range assignments {
 		if assignment == topic {
@@ -586,6 +568,11 @@ func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignme
 }
 
 func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
+	unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
+	for partition := range partition2AllPotentialConsumers {
+		unassignedPartitions[partition] = true
+	}
+
 	sortedPartitions := make([]topicPartitionAssignment, 0)
 	if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
 		// if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
@@ -601,7 +588,6 @@ func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, par
 			pq[i] = &consumerGroupMember{
 				id:          consumerID,
 				assignments: consumerAssignments,
-				index:       i,
 			}
 			i++
 		}
@@ -615,43 +601,36 @@ func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, par
 			member := pq[0]
 
 			// partitions that were assigned to a different consumer last time
-			prevPartitions := make([]topicPartitionAssignment, 0)
-			for partition := range partitionsWithADifferentPreviousAssignment {
-				// from partitions that had a different consumer before, keep only those that are assigned to this consumer now
-				if memberAssignmentsIncludeTopicPartition(member.assignments, partition) {
-					prevPartitions = append(prevPartitions, partition)
+			var prevPartition *topicPartitionAssignment
+			var prevPartitionIndex int
+			for i, partition := range member.assignments {
+				if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
+					prevPartition = &partition
+					prevPartitionIndex = i
+					break
 				}
 			}
 
-			if len(prevPartitions) > 0 {
+			if prevPartition != nil {
 				// if there is a partition on this consumer that was assigned to another consumer before mark it as good options for reassignment
-				partition := prevPartitions[0]
-				prevPartitions = append(prevPartitions[:0], prevPartitions[1:]...)
-				member.assignments = removeTopicPartitionFromMemberAssignments(member.assignments, partition)
-				sortedPartitions = append(sortedPartitions, partition)
-				heap.Fix(&pq, member.index)
+				member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
+				sortedPartitions = append(sortedPartitions, *prevPartition)
+				delete(unassignedPartitions, *prevPartition)
+				heap.Fix(&pq, 0)
 			} else if len(member.assignments) > 0 {
 				// otherwise, mark any other one of the current partitions as a reassignment candidate
 				partition := member.assignments[0]
 				member.assignments = append(member.assignments[:0], member.assignments[1:]...)
 				sortedPartitions = append(sortedPartitions, partition)
-				heap.Fix(&pq, member.index)
+				delete(unassignedPartitions, partition)
+				heap.Fix(&pq, 0)
 			} else {
-				heap.Remove(&pq, 0)
+				heap.Pop(&pq)
 			}
 		}
 
-		for partition := range partition2AllPotentialConsumers {
-			found := false
-			for _, p := range sortedPartitions {
-				if partition == p {
-					found = true
-					break
-				}
-			}
-			if !found {
-				sortedPartitions = append(sortedPartitions, partition)
-			}
+		for partition := range unassignedPartitions {
+			sortedPartitions = append(sortedPartitions, partition)
 		}
 	} else {
 		// an ascending sorted set of topic partitions based on how many consumers can potentially use them
@@ -678,12 +657,21 @@ func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartition
 
 func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
 	// sort the members by the number of partition assignments in descending order
-	sortedPartionIDs := make([]topicPartitionAssignment, 0, len(partition2AllPotentialConsumers))
+	sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
+	i := 0
 	for partition := range partition2AllPotentialConsumers {
-		sortedPartionIDs = append(sortedPartionIDs, partition)
-	}
-	sort.SliceStable(sortedPartionIDs, func(i, j int) bool {
-		return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) > len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
+		sortedPartionIDs[i] = partition
+		i++
+	}
+	sort.Slice(sortedPartionIDs, func(i, j int) bool {
+		if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
+			ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
+			if ret == 0 {
+				return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
+			}
+			return ret < 0
+		}
+		return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
 	})
 	return sortedPartionIDs
 }
@@ -805,22 +793,18 @@ func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadat
 		}
 		sort.Sort(sort.Reverse(sort.IntSlice(generations)))
 
-		if len(generations) > 0 {
-			consumer := consumers[generations[0]]
-			currentConsumerAssignments, exists := currentAssignment[consumer]
-			if !exists {
-				currentConsumerAssignments = []topicPartitionAssignment{partition}
-			} else {
-				currentConsumerAssignments = append(currentConsumerAssignments, partition)
-			}
-			currentAssignment[consumer] = currentConsumerAssignments
+		consumer := consumers[generations[0]]
+		if _, exists := currentAssignment[consumer]; !exists {
+			currentAssignment[consumer] = []topicPartitionAssignment{partition}
+		} else {
+			currentAssignment[consumer] = append(currentAssignment[consumer], partition)
+		}
 
-			// check for previous assignment, if any
-			if len(generations) > 1 {
-				prevAssignment[partition] = consumerGenerationPair{
-					MemberID:   consumers[generations[1]],
-					Generation: generations[1],
-				}
+		// check for previous assignment, if any
+		if len(generations) > 1 {
+			prevAssignment[partition] = consumerGenerationPair{
+				MemberID:   consumers[generations[1]],
+				Generation: generations[1],
 			}
 		}
 	}
@@ -1038,7 +1022,6 @@ nextCand:
 }
 
 type consumerGroupMember struct {
-	index       int // the index of the consumer group member on the heap
 	id          string
 	assignments []topicPartitionAssignment
 }
@@ -1059,14 +1042,10 @@ func (pq assignmentPriorityQueue) Less(i, j int) bool {
 
 func (pq assignmentPriorityQueue) Swap(i, j int) {
 	pq[i], pq[j] = pq[j], pq[i]
-	pq[i].index = i
-	pq[j].index = j
 }
 
 func (pq *assignmentPriorityQueue) Push(x interface{}) {
-	n := len(*pq)
 	member := x.(*consumerGroupMember)
-	member.index = n
 	*pq = append(*pq, member)
 }
 
@@ -1074,7 +1053,6 @@ func (pq *assignmentPriorityQueue) Pop() interface{} {
 	old := *pq
 	n := len(old)
 	member := old[n-1]
-	member.index = -1 // for safety
 	*pq = old[0 : n-1]
 	return member
 }

+ 86 - 96
balance_strategy_test.go

@@ -880,102 +880,6 @@ func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) {
 	}
 }
 
-func Test_removeIndexFromStringSlice(t *testing.T) {
-	type args struct {
-		s []string
-		i int
-	}
-	tests := []struct {
-		name string
-		args args
-		want []string
-	}{
-		{
-			name: "Empty slice",
-			args: args{
-				s: make([]string, 0),
-				i: 0,
-			},
-			want: make([]string, 0),
-		},
-		{
-			name: "Slice with single entry",
-			args: args{
-				s: []string{"foo"},
-				i: 0,
-			},
-			want: make([]string, 0),
-		},
-		{
-			name: "Slice with multiple entries",
-			args: args{
-				s: []string{"a", "b", "c"},
-				i: 0,
-			},
-			want: []string{"b", "c"},
-		},
-		{
-			name: "Slice with multiple entries and index is in the middle",
-			args: args{
-				s: []string{"a", "b", "c"},
-				i: 1,
-			},
-			want: []string{"a", "c"},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if got := removeIndexFromStringSlice(tt.args.s, tt.args.i); !reflect.DeepEqual(got, tt.want) {
-				t.Errorf("removeIndexFromSlice() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}
-
-func Test_removeValueFromStringSlice(t *testing.T) {
-	type args struct {
-		s []string
-		e string
-	}
-	tests := []struct {
-		name string
-		args args
-		want []string
-	}{
-		{
-			name: "Empty input slice",
-			args: args{
-				s: []string{},
-				e: "",
-			},
-			want: []string{},
-		},
-		{
-			name: "Input slice with one entry that doesn't match",
-			args: args{
-				s: []string{"a"},
-				e: "b",
-			},
-			want: []string{"a"},
-		},
-		{
-			name: "Input slice with multiple entries and a positive match",
-			args: args{
-				s: []string{"a", "b", "c"},
-				e: "b",
-			},
-			want: []string{"a", "c"},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if got := removeValueFromStringSlice(tt.args.s, tt.args.e); !reflect.DeepEqual(got, tt.want) {
-				t.Errorf("removeValueFromSlice() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}
-
 func Test_assignPartition(t *testing.T) {
 	type args struct {
 		partition                       topicPartitionAssignment
@@ -2276,3 +2180,89 @@ func getRandomSublist(r *rand.Rand, s []string) []string {
 	}
 	return subList
 }
+
+func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) {
+	type args struct {
+		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
+	}
+	tests := []struct {
+		name string
+		args args
+		want []topicPartitionAssignment
+	}{
+		{
+			name: "Single topic partition",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{
+						Topic:     "t1",
+						Partition: 0,
+					}: []string{"c1", "c2"},
+				},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{
+					Topic:     "t1",
+					Partition: 0,
+				},
+			},
+		},
+		{
+			name: "Multiple topic partitions with the same number of consumers but different topic names",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{
+						Topic:     "t1",
+						Partition: 0,
+					}: []string{"c1", "c2"},
+					topicPartitionAssignment{
+						Topic:     "t2",
+						Partition: 0,
+					}: []string{"c1", "c2"},
+				},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{
+					Topic:     "t1",
+					Partition: 0,
+				},
+				topicPartitionAssignment{
+					Topic:     "t2",
+					Partition: 0,
+				},
+			},
+		},
+		{
+			name: "Multiple topic partitions with the same number of consumers and topic names",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{
+						Topic:     "t1",
+						Partition: 0,
+					}: []string{"c1", "c2"},
+					topicPartitionAssignment{
+						Topic:     "t1",
+						Partition: 1,
+					}: []string{"c1", "c2"},
+				},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{
+					Topic:     "t1",
+					Partition: 0,
+				},
+				topicPartitionAssignment{
+					Topic:     "t1",
+					Partition: 1,
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}