Browse Source

Correctly identify consumers that are no longer subscribed to topics

Scott Kidder 5 years ago
parent
commit
e46c30deb2
2 changed files with 75 additions and 46 deletions
  1. 41 46
      balance_strategy.go
  2. 34 0
      balance_strategy_test.go

+ 41 - 46
balance_strategy.go

@@ -214,54 +214,45 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
 	}
 
 	// create a mapping of each partition to its current consumer, where possible
-	currentPartitionConsumer := make(map[topicPartitionAssignment]string, len(currentAssignment))
-	for memberID, partitions := range currentAssignment {
-		for _, partition := range partitions {
-			currentPartitionConsumer[partition] = memberID
-		}
+	currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
+	unvisitedPartitions := make(map[topicPartitionAssignment][]string, len(partition2AllPotentialConsumers))
+	for partition := range partition2AllPotentialConsumers {
+		unvisitedPartitions[partition] = []string{}
 	}
-
-	// sort the topic partitions in order of priority for reassignment
-	sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
-	unassignedPartitions := deepCopyPartitions(sortedPartitions)
+	var unassignedPartitions []topicPartitionAssignment
 	for memberID, partitions := range currentAssignment {
-		// if a consumer that existed before (and had some partition assignments) is now removed, remove it from currentAssignment
-		if _, exists := members[memberID]; !exists {
-			for _, partition := range partitions {
-				delete(currentPartitionConsumer, partition)
-			}
-			delete(currentAssignment, memberID)
-			continue
-		}
-
-		// otherwise (the consumer still exists)
-		updatedPartitions := deepCopyPartitions(partitions)
+		var keepPartitions []topicPartitionAssignment
 		for _, partition := range partitions {
+			// If this partition no longer exists at all, likely due to the
+			// topic being deleted, we remove the partition from the member.
 			if _, exists := partition2AllPotentialConsumers[partition]; !exists {
-				// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer
-				updatedPartitions = removeTopicPartitionFromMemberAssignments(updatedPartitions, partition)
-				delete(currentPartitionConsumer, partition)
-			} else if _, exists := topics[partition.Topic]; !exists {
-				// if this partition cannot remain assigned to its current consumer because the consumer
-				// is no longer subscribed to its topic remove it from currentAssignment of the consumer
-				updatedPartitions = removeTopicPartitionFromMemberAssignments(updatedPartitions, partition)
-			} else {
-				// otherwise, remove the topic partition from those that need to be assigned only if
-				// its current consumer is still subscribed to its topic (because it is already assigned
-				// and we would want to preserve that assignment as much as possible)
-				unassignedPartitions = removeTopicPartitionFromMemberAssignments(unassignedPartitions, partition)
+				continue
 			}
+			delete(unvisitedPartitions, partition)
+			currentPartitionConsumers[partition] = memberID
+
+			if !strsContains(members[memberID].Topics, partition.Topic) {
+				unassignedPartitions = append(unassignedPartitions, partition)
+				continue
+			}
+			keepPartitions = append(keepPartitions, partition)
 		}
-		currentAssignment[memberID] = updatedPartitions
+		currentAssignment[memberID] = keepPartitions
+	}
+	for unvisited := range unvisitedPartitions {
+		unassignedPartitions = append(unassignedPartitions, unvisited)
 	}
 
+	// sort the topic partitions in order of priority for reassignment
+	sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
+
 	// at this point we have preserved all valid topic partition to consumer assignments and removed
 	// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
 	// to consumers so that the topic partition assignments are as balanced as possible.
 
 	// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
 	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
-	s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
+	s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
 
 	// Assemble plan
 	plan := make(BalanceStrategyPlan, len(currentAssignment))
@@ -277,6 +268,15 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
 	return plan, nil
 }
 
+func strsContains(s []string, value string) bool {
+	for _, entry := range s {
+		if entry == value {
+			return true
+		}
+	}
+	return false
+}
+
 // Balance assignments across consumers for maximum fairness and stickiness.
 func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
 	initializing := false
@@ -601,28 +601,23 @@ func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, par
 			member := pq[0]
 
 			// partitions that were assigned to a different consumer last time
-			var prevPartition *topicPartitionAssignment
 			var prevPartitionIndex int
 			for i, partition := range member.assignments {
 				if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
-					prevPartition = &partition
 					prevPartitionIndex = i
 					break
 				}
 			}
 
-			if prevPartition != nil {
-				// if there is a partition on this consumer that was assigned to another consumer before mark it as good options for reassignment
-				member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
-				sortedPartitions = append(sortedPartitions, *prevPartition)
-				delete(unassignedPartitions, *prevPartition)
-				heap.Fix(&pq, 0)
-			} else if len(member.assignments) > 0 {
-				// otherwise, mark any other one of the current partitions as a reassignment candidate
-				partition := member.assignments[0]
-				member.assignments = append(member.assignments[:0], member.assignments[1:]...)
+			if len(member.assignments) > 0 {
+				partition := member.assignments[prevPartitionIndex]
 				sortedPartitions = append(sortedPartitions, partition)
 				delete(unassignedPartitions, partition)
+				if prevPartitionIndex == 0 {
+					member.assignments = member.assignments[1:]
+				} else {
+					member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
+				}
 				heap.Fix(&pq, 0)
 			} else {
 				heap.Pop(&pq)

+ 34 - 0
balance_strategy_test.go

@@ -1188,6 +1188,40 @@ func Test_stickyBalanceStrategy_Plan(t *testing.T) {
 				},
 			},
 		},
+		{
+			name: "One consumer that is no longer subscribed to a topic that it had previously been consuming from",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic2"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0}}, 1),
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0},
+					"topic2": []int32{0},
+				},
+			},
+		},
+		{
+			name: "Two consumers where one is no longer interested in consuming from a topic that it had been consuming from",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic2"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0}}, 1),
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic1", "topic2"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{1}}, 1),
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0, 1},
+					"topic2": []int32{0, 1},
+				},
+			},
+		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {