Browse Source

Add sticky partition assignor

Scott Kidder 4 năm trước cách đây
mục cha
commit
01915cf82e

+ 898 - 0
balance_strategy.go

@@ -5,6 +5,10 @@ import (
 	"sort"
 )
 
+const (
+	defaultGeneration = -1
+)
+
 // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
 // It contains an allocation of topic/partitions by memberID in the form of
 // a `memberID -> topic -> partitions` map.
@@ -68,6 +72,19 @@ var BalanceStrategyRoundRobin = &balanceStrategy{
 	},
 }
 
+// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
+// while maintain a balanced partition distribution.
+// Example with topic T with six partitions (0..5) and two members (M1, M2):
+//   M1: {T: [0, 2, 4]}
+//   M2: {T: [1, 3, 5]}
+//
+// On reassignment with an additional consumer, you might get an assignment plan like:
+//   M1: {T: [0, 2]}
+//   M2: {T: [1, 3]}
+//   M3: {T: [4, 5]}
+//
+var BalanceStrategySticky = &stickyBalanceStrategy{}
+
 // --------------------------------------------------------------------
 
 type balanceStrategy struct {
@@ -127,3 +144,884 @@ func balanceStrategyHashValue(vv ...string) uint32 {
 	}
 	return h
 }
+
+type stickyBalanceStrategy struct {
+	movements partitionMovements
+}
+
+// Name implements BalanceStrategy.
+func (s *stickyBalanceStrategy) Name() string { return "sticky" }
+
+// Plan implements BalanceStrategy.
+func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
+	// track partition movements during generation of the partition assignment plan
+	s.movements = partitionMovements{
+		Movements:                 make(map[topicPartitionAssignment]consumerPair),
+		PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
+	}
+
+	// prepopulate the current assignment state from userdata on the consumer group members
+	currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
+	if err != nil {
+		return nil, err
+	}
+
+	// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
+	isFreshAssignment := false
+	if len(currentAssignment) == 0 {
+		isFreshAssignment = true
+	}
+
+	// create a mapping of all current topic partitions and the consumers that can be assigned to them
+	partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
+	for topic, partitions := range topics {
+		for _, partition := range partitions {
+			partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = make([]string, 0)
+		}
+	}
+
+	// create a mapping of all consumers to all potential topic partitions that can be assigned to them
+	// also, populate the mapping of partitions to potential consumers
+	consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
+	for memberID, meta := range members {
+		consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
+		for _, topicSubscription := range meta.Topics {
+			// only evaluate topic subscriptions that are present in the supplied topics map
+			if _, found := topics[topicSubscription]; found {
+				for _, partition := range topics[topicSubscription] {
+					topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
+					consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
+					partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
+				}
+			}
+		}
+
+		// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
+		if _, exists := currentAssignment[memberID]; !exists {
+			currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
+		}
+	}
+
+	// create a mapping of each partition to its current consumer, where possible
+	currentPartitionConsumer := make(map[topicPartitionAssignment]string, len(currentAssignment))
+	for memberID, partitions := range currentAssignment {
+		for _, partition := range partitions {
+			currentPartitionConsumer[partition] = memberID
+		}
+	}
+
+	// sort the topic partitions in order of priority for reassignment
+	sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
+	unassignedPartitions := deepCopyPartitions(sortedPartitions)
+	for memberID, partitions := range currentAssignment {
+		// if a consumer that existed before (and had some partition assignments) is now removed, remove it from currentAssignment
+		if _, exists := members[memberID]; !exists {
+			for _, partition := range partitions {
+				delete(currentPartitionConsumer, partition)
+			}
+			delete(currentAssignment, memberID)
+			continue
+		}
+
+		// otherwise (the consumer still exists)
+		updatedPartitions := deepCopyPartitions(partitions)
+		for _, partition := range partitions {
+			if _, exists := partition2AllPotentialConsumers[partition]; !exists {
+				// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer
+				updatedPartitions = removeTopicPartitionFromMemberAssignments(updatedPartitions, partition)
+				delete(currentPartitionConsumer, partition)
+			} else if _, exists := topics[partition.Topic]; !exists {
+				// if this partition cannot remain assigned to its current consumer because the consumer
+				// is no longer subscribed to its topic remove it from currentAssignment of the consumer
+				updatedPartitions = removeTopicPartitionFromMemberAssignments(updatedPartitions, partition)
+			} else {
+				// otherwise, remove the topic partition from those that need to be assigned only if
+				// its current consumer is still subscribed to its topic (because it is already assigned
+				// and we would want to preserve that assignment as much as possible)
+				unassignedPartitions = removeTopicPartitionFromMemberAssignments(unassignedPartitions, partition)
+			}
+		}
+		currentAssignment[memberID] = updatedPartitions
+	}
+
+	// at this point we have preserved all valid topic partition to consumer assignments and removed
+	// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
+	// to consumers so that the topic partition assignments are as balanced as possible.
+
+	// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
+	s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
+
+	// Assemble plan
+	plan := make(BalanceStrategyPlan, len(currentAssignment))
+	for memberID, assignments := range currentAssignment {
+		if len(assignments) == 0 {
+			plan[memberID] = make(map[string][]int32, 0)
+		} else {
+			for _, assignment := range assignments {
+				plan.Add(memberID, assignment.Topic, assignment.Partition)
+			}
+		}
+	}
+	return plan, nil
+}
+
+// Balance assignments across consumers for maximum fairness and stickiness.
+func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
+	initializing := false
+	if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
+		initializing = true
+	}
+
+	// assign all unassigned partitions
+	for _, partition := range unassignedPartitions {
+		// skip if there is no potential consumer for the partition
+		if len(partition2AllPotentialConsumers[partition]) == 0 {
+			continue
+		}
+		sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
+	}
+
+	// narrow down the reassignment scope to only those partitions that can actually be reassigned
+	for partition := range partition2AllPotentialConsumers {
+		if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
+			sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
+		}
+	}
+
+	// narrow down the reassignment scope to only those consumers that are subject to reassignment
+	fixedAssignments := make(map[string][]topicPartitionAssignment)
+	for memberID := range consumer2AllPotentialPartitions {
+		if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
+			fixedAssignments[memberID] = currentAssignment[memberID]
+			delete(currentAssignment, memberID)
+			sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
+		}
+	}
+
+	// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
+	preBalanceAssignment := deepCopyAssignment(currentAssignment)
+	preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
+	for k, v := range currentPartitionConsumer {
+		preBalancePartitionConsumers[k] = v
+	}
+
+	reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
+
+	// if we are not preserving existing assignments and we have made changes to the current assignment
+	// make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
+	if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
+		currentAssignment = deepCopyAssignment(preBalanceAssignment)
+		currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
+		for k, v := range preBalancePartitionConsumers {
+			currentPartitionConsumer[k] = v
+		}
+	}
+
+	// add the fixed assignments (those that could not change) back
+	for consumer, assignments := range fixedAssignments {
+		currentAssignment[consumer] = assignments
+	}
+}
+
+// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
+// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
+// Lower balance score indicates a more balanced assignment.
+func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
+	consumer2AssignmentSize := make(map[string]int, len(assignment))
+	for memberID, partitions := range assignment {
+		consumer2AssignmentSize[memberID] = len(partitions)
+	}
+
+	var score float64
+	for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
+		delete(consumer2AssignmentSize, memberID)
+		for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
+			score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
+		}
+	}
+	return int(score)
+}
+
+// Determine whether the current assignment plan is balanced.
+func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
+	sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
+	min := len(currentAssignment[sortedCurrentSubscriptions[0]])
+	max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
+	if min >= max-1 {
+		// if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
+		return true
+	}
+
+	// create a mapping from partitions to the consumer assigned to them
+	allPartitions := make(map[topicPartitionAssignment]string)
+	for memberID, partitions := range currentAssignment {
+		for _, partition := range partitions {
+			if _, exists := allPartitions[partition]; exists {
+				Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
+			}
+			allPartitions[partition] = memberID
+		}
+	}
+
+	// for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
+	// could but did not get cannot be moved to it (because that would break the balance)
+	for _, memberID := range sortedCurrentSubscriptions {
+		consumerPartitions := currentAssignment[memberID]
+		consumerPartitionCount := len(consumerPartitions)
+
+		// skip if this consumer already has all the topic partitions it can get
+		if consumerPartitionCount == len(allSubscriptions[memberID]) {
+			continue
+		}
+
+		// otherwise make sure it cannot get any more
+		potentialTopicPartitions := allSubscriptions[memberID]
+		for _, partition := range potentialTopicPartitions {
+			if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
+				otherConsumer := allPartitions[partition]
+				otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
+				if consumerPartitionCount < otherConsumerPartitionCount {
+					return false
+				}
+			}
+		}
+	}
+	return true
+}
+
+// Reassign all topic partitions that need reassignment until balanced.
+func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
+	reassignmentPerformed := false
+	modified := false
+
+	// repeat reassignment until no partition can be moved to improve the balance
+	for {
+		modified = false
+		// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
+		// until the full list is processed or a balance is achieved
+		for _, partition := range reassignablePartitions {
+			if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
+				break
+			}
+
+			// the partition must have at least two consumers
+			if len(partition2AllPotentialConsumers[partition]) <= 1 {
+				Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
+			}
+
+			// the partition must have a consumer
+			consumer := currentPartitionConsumer[partition]
+			if consumer == "" {
+				Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
+			}
+
+			if _, exists := prevAssignment[partition]; exists {
+				if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
+					sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
+					reassignmentPerformed = true
+					modified = true
+					continue
+				}
+			}
+
+			// check if a better-suited consumer exists for the partition; if so, reassign it
+			for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
+				if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
+					sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
+					reassignmentPerformed = true
+					modified = true
+					break
+				}
+
+			}
+		}
+		if !modified {
+			return reassignmentPerformed
+		}
+	}
+}
+
+// Identify a new consumer for a topic partition and reassign it.
+func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
+	for _, anotherConsumer := range sortedCurrentSubscriptions {
+		if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
+			return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
+		}
+	}
+	return sortedCurrentSubscriptions
+}
+
+// Reassign a specific partition to a new consumer
+func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
+	consumer := currentPartitionConsumer[partition]
+	// find the correct partition movement considering the stickiness requirement
+	partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
+	return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
+}
+
+// Track the movement of a topic partition after assignment
+func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
+	oldConsumer := currentPartitionConsumer[partition]
+	s.movements.movePartition(partition, oldConsumer, newConsumer)
+
+	currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
+	currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
+	currentPartitionConsumer[partition] = newConsumer
+	return sortMemberIDsByPartitionAssignments(currentAssignment)
+}
+
+// Determine whether a specific consumer should be considered for topic partition assignment.
+func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
+	currentPartitions := currentAssignment[memberID]
+	currentAssignmentSize := len(currentPartitions)
+	maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
+	if currentAssignmentSize > maxAssignmentSize {
+		Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
+	}
+	if currentAssignmentSize < maxAssignmentSize {
+		// if a consumer is not assigned all its potential partitions it is subject to reassignment
+		return true
+	}
+	for _, partition := range currentPartitions {
+		if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
+			return true
+		}
+	}
+	return false
+}
+
+// Only consider reassigning those topic partitions that have two or more potential consumers.
+func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
+	return len(partition2AllPotentialConsumers[partition]) >= 2
+}
+
+// The assignment should improve the overall balance of the partition assignments to consumers.
+func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
+	updatedSubscriptions := make([]string, len(sortedCurrentSubscriptions))
+	for i, s := range sortedCurrentSubscriptions {
+		updatedSubscriptions[i] = s
+	}
+	i := 0
+	for _, memberID := range sortedCurrentSubscriptions {
+		if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
+			updatedSubscriptions = removeIndexFromStringSlice(updatedSubscriptions, i)
+			currentAssignment[memberID] = append(currentAssignment[memberID], partition)
+			currentPartitionConsumer[partition] = memberID
+			updatedSubscriptions = append(updatedSubscriptions, memberID)
+			break
+		}
+		i++
+	}
+	return updatedSubscriptions
+}
+
+// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
+func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
+	userDataV1 := &StickyAssignorUserDataV1{}
+	if err := decode(userDataBytes, userDataV1); err != nil {
+		userDataV0 := &StickyAssignorUserDataV0{}
+		if err := decode(userDataBytes, userDataV0); err != nil {
+			return nil, err
+		}
+		return userDataV0, nil
+	}
+	return userDataV1, nil
+}
+
+// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
+// to those topic partitions currently reported by the Kafka cluster.
+func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
+	assignments := deepCopyAssignment(currentAssignment)
+	for memberID, partitions := range assignments {
+		// perform in-place filtering
+		i := 0
+		for _, partition := range partitions {
+			if _, exists := partition2AllPotentialConsumers[partition]; exists {
+				partitions[i] = partition
+				i++
+			}
+		}
+		assignments[memberID] = partitions[:i]
+	}
+	return assignments
+}
+
+func removeValueFromStringSlice(s []string, e string) []string {
+	for i, v := range s {
+		if v == e {
+			s = append(s[:i], s[i+1:]...)
+			break
+		}
+	}
+	return s
+}
+
+func removeIndexFromStringSlice(s []string, i int) []string {
+	if len(s) == 0 {
+		return s
+	}
+	return append(s[:i], s[i+1:]...)
+}
+
+func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
+	for i, assignment := range assignments {
+		if assignment == topic {
+			return append(assignments[:i], assignments[i+1:]...)
+		}
+	}
+	return assignments
+}
+
+func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
+	for _, assignment := range assignments {
+		if assignment == topic {
+			return true
+		}
+	}
+	return false
+}
+
+func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
+	sortedPartitions := make([]topicPartitionAssignment, 0)
+	if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
+		// if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
+		// then we just need to simply list partitions in a round robin fashion (from consumers with
+		// most assigned partitions to those with least)
+		assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
+
+		// sortedMemberIDs contains a descending-sorted list of consumers based on how many valid partitions are currently assigned to them
+		sortedMemberIDs := sortMemberIDsByPartitionAssignments(assignments)
+		for i := len(sortedMemberIDs)/2 - 1; i >= 0; i-- {
+			opp := len(sortedMemberIDs) - 1 - i
+			sortedMemberIDs[i], sortedMemberIDs[opp] = sortedMemberIDs[opp], sortedMemberIDs[i]
+		}
+		for {
+			// loop until no consumer-group members remain
+			if len(sortedMemberIDs) == 0 {
+				break
+			}
+			updatedMemberIDs := make([]string, 0)
+			for _, memberID := range sortedMemberIDs {
+				// partitions that were assigned to a different consumer last time
+				prevPartitions := make([]topicPartitionAssignment, 0)
+				for partition := range partitionsWithADifferentPreviousAssignment {
+					// from partitions that had a different consumer before, keep only those that are assigned to this consumer now
+					if memberAssignmentsIncludeTopicPartition(assignments[memberID], partition) {
+						prevPartitions = append(prevPartitions, partition)
+					}
+				}
+
+				if len(prevPartitions) > 0 {
+					// if there is a partition on this consumer that was assigned to another consumer before mark it as good options for reassignment
+					partition := prevPartitions[0]
+					prevPartitions = append(prevPartitions[:0], prevPartitions[1:]...)
+					assignments[memberID] = removeTopicPartitionFromMemberAssignments(assignments[memberID], partition)
+					sortedPartitions = append(sortedPartitions, partition)
+					updatedMemberIDs = append(updatedMemberIDs, memberID)
+				} else if len(assignments[memberID]) > 0 {
+					// otherwise, mark any other one of the current partitions as a reassignment candidate
+					partition := assignments[memberID][0]
+					assignments[memberID] = append(assignments[memberID][:0], assignments[memberID][1:]...)
+					sortedPartitions = append(sortedPartitions, partition)
+					updatedMemberIDs = append(updatedMemberIDs, memberID)
+				}
+			}
+			sortedMemberIDs = updatedMemberIDs
+		}
+
+		for partition := range partition2AllPotentialConsumers {
+			found := false
+			for _, p := range sortedPartitions {
+				if partition == p {
+					found = true
+					break
+				}
+			}
+			if !found {
+				sortedPartitions = append(sortedPartitions, partition)
+			}
+		}
+	} else {
+		// an ascending sorted set of topic partitions based on how many consumers can potentially use them
+		sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
+	}
+	return sortedPartitions
+}
+
+func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
+	// sort the members by the number of partition assignments in ascending order
+	sortedMemberIDs := make([]string, 0, len(assignments))
+	for memberID := range assignments {
+		sortedMemberIDs = append(sortedMemberIDs, memberID)
+	}
+	sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
+		ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
+		if ret == 0 {
+			return sortedMemberIDs[i] < sortedMemberIDs[j]
+		}
+		return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
+	})
+	return sortedMemberIDs
+}
+
+func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
+	// sort the members by the number of partition assignments in descending order
+	sortedPartionIDs := make([]topicPartitionAssignment, 0, len(partition2AllPotentialConsumers))
+	for partition := range partition2AllPotentialConsumers {
+		sortedPartionIDs = append(sortedPartionIDs, partition)
+	}
+	sort.SliceStable(sortedPartionIDs, func(i, j int) bool {
+		return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) > len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
+	})
+	return sortedPartionIDs
+}
+
+func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
+	dst := make([]topicPartitionAssignment, len(src))
+	for i, partition := range src {
+		dst[i] = partition
+	}
+	return dst
+}
+
+func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
+	copy := make(map[string][]topicPartitionAssignment, len(assignment))
+	for memberID, subscriptions := range assignment {
+		copy[memberID] = append(subscriptions[:0:0], subscriptions...)
+	}
+	return copy
+}
+
+func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
+	curMembers := make(map[string]int)
+	for _, cur := range partition2AllPotentialConsumers {
+		if len(curMembers) == 0 {
+			for _, curMembersElem := range cur {
+				curMembers[curMembersElem]++
+			}
+			continue
+		}
+
+		if len(curMembers) != len(cur) {
+			return false
+		}
+
+		yMap := make(map[string]int)
+		for _, yElem := range cur {
+			yMap[yElem]++
+		}
+
+		for curMembersMapKey, curMembersMapVal := range curMembers {
+			if yMap[curMembersMapKey] != curMembersMapVal {
+				return false
+			}
+		}
+	}
+
+	curPartitions := make(map[topicPartitionAssignment]int)
+	for _, cur := range consumer2AllPotentialPartitions {
+		if len(curPartitions) == 0 {
+			for _, curPartitionElem := range cur {
+				curPartitions[curPartitionElem]++
+			}
+			continue
+		}
+
+		if len(curPartitions) != len(cur) {
+			return false
+		}
+
+		yMap := make(map[topicPartitionAssignment]int)
+		for _, yElem := range cur {
+			yMap[yElem]++
+		}
+
+		for curMembersMapKey, curMembersMapVal := range curPartitions {
+			if yMap[curMembersMapKey] != curMembersMapVal {
+				return false
+			}
+		}
+	}
+	return true
+}
+
+// We need to process subscriptions' user data with each consumer's reported generation in mind
+// higher generations overwrite lower generations in case of a conflict
+// note that a conflict could exist only if user data is for different generations
+func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
+	currentAssignment := make(map[string][]topicPartitionAssignment)
+	prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
+
+	// for each partition we create a sorted map of its consumers by generation
+	sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
+	for memberID, meta := range members {
+		consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
+		if err != nil {
+			return nil, nil, err
+		}
+		for _, partition := range consumerUserData.partitions() {
+			if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
+				if consumerUserData.hasGeneration() {
+					if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
+						// same partition is assigned to two consumers during the same rebalance.
+						// log a warning and skip this record
+						Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
+						continue
+					} else {
+						consumers[consumerUserData.generation()] = memberID
+					}
+				} else {
+					consumers[defaultGeneration] = memberID
+				}
+			} else {
+				generation := defaultGeneration
+				if consumerUserData.hasGeneration() {
+					generation = consumerUserData.generation()
+				}
+				sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
+			}
+		}
+	}
+
+	// prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
+	// current and previous consumers are the last two consumers of each partition in the above sorted map
+	for partition, consumers := range sortedPartitionConsumersByGeneration {
+		// sort consumers by generation in decreasing order
+		var generations []int
+		for generation := range consumers {
+			generations = append(generations, generation)
+		}
+		sort.Sort(sort.Reverse(sort.IntSlice(generations)))
+
+		if len(generations) > 0 {
+			consumer := consumers[generations[0]]
+			currentConsumerAssignments, exists := currentAssignment[consumer]
+			if !exists {
+				currentConsumerAssignments = []topicPartitionAssignment{partition}
+			} else {
+				currentConsumerAssignments = append(currentConsumerAssignments, partition)
+			}
+			currentAssignment[consumer] = currentConsumerAssignments
+
+			// check for previous assignment, if any
+			if len(generations) > 1 {
+				prevAssignment[partition] = consumerGenerationPair{
+					MemberID:   consumers[generations[1]],
+					Generation: generations[1],
+				}
+			}
+		}
+	}
+	return currentAssignment, prevAssignment, nil
+}
+
+type consumerGenerationPair struct {
+	MemberID   string
+	Generation int
+}
+
+// consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
+type consumerPair struct {
+	SrcMemberID string
+	DstMemberID string
+}
+
+// partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
+type partitionMovements struct {
+	PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
+	Movements                 map[topicPartitionAssignment]consumerPair
+}
+
+func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
+	pair := p.Movements[partition]
+	delete(p.Movements, partition)
+
+	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
+	delete(partitionMovementsForThisTopic[pair], partition)
+	if len(partitionMovementsForThisTopic[pair]) == 0 {
+		delete(partitionMovementsForThisTopic, pair)
+	}
+	if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
+		delete(p.PartitionMovementsByTopic, partition.Topic)
+	}
+	return pair
+}
+
+func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
+	p.Movements[partition] = pair
+	if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
+		p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
+	}
+	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
+	if _, exists := partitionMovementsForThisTopic[pair]; !exists {
+		partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
+	}
+	partitionMovementsForThisTopic[pair][partition] = true
+}
+
+func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
+	pair := consumerPair{
+		SrcMemberID: oldConsumer,
+		DstMemberID: newConsumer,
+	}
+	if _, exists := p.Movements[partition]; exists {
+		// this partition has previously moved
+		existingPair := p.removeMovementRecordOfPartition(partition)
+		if existingPair.DstMemberID != oldConsumer {
+			Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
+		}
+		if existingPair.SrcMemberID != newConsumer {
+			// the partition is not moving back to its previous consumer
+			p.addPartitionMovementRecord(partition, consumerPair{
+				SrcMemberID: existingPair.SrcMemberID,
+				DstMemberID: newConsumer,
+			})
+		}
+	} else {
+		p.addPartitionMovementRecord(partition, pair)
+	}
+}
+
+func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
+	if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
+		return partition
+	}
+	if _, exists := p.Movements[partition]; exists {
+		// this partition has previously moved
+		if oldConsumer != p.Movements[partition].DstMemberID {
+			Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
+		}
+		oldConsumer = p.Movements[partition].SrcMemberID
+	}
+
+	partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
+	reversePair := consumerPair{
+		SrcMemberID: newConsumer,
+		DstMemberID: oldConsumer,
+	}
+	if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
+		return partition
+	}
+	var reversePairPartition topicPartitionAssignment
+	for otherPartition := range partitionMovementsForThisTopic[reversePair] {
+		reversePairPartition = otherPartition
+	}
+	return reversePairPartition
+}
+
+func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
+	if src == dst {
+		return currentPath, false
+	}
+	if len(pairs) == 0 {
+		return currentPath, false
+	}
+	for _, pair := range pairs {
+		if src == pair.SrcMemberID && dst == pair.DstMemberID {
+			currentPath = append(currentPath, src, dst)
+			return currentPath, true
+		}
+	}
+
+	for _, pair := range pairs {
+		if pair.SrcMemberID == src {
+			// create a deep copy of the pairs, excluding the current pair
+			reducedSet := make([]consumerPair, len(pairs)-1)
+			i := 0
+			for _, p := range pairs {
+				if p != pair {
+					reducedSet[i] = pair
+					i++
+				}
+			}
+
+			currentPath = append(currentPath, pair.SrcMemberID)
+			return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
+		}
+	}
+	return currentPath, false
+}
+
+func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
+	superCycle := make([]string, len(cycle)-1)
+	for i := 0; i < len(cycle)-1; i++ {
+		superCycle[i] = cycle[i]
+	}
+	for _, c := range cycle {
+		superCycle = append(superCycle, c)
+	}
+	for _, foundCycle := range cycles {
+		if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
+			return true
+		}
+	}
+	return false
+}
+
+func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
+	cycles := make([][]string, 0)
+	for _, pair := range pairs {
+		// create a deep copy of the pairs, excluding the current pair
+		reducedPairs := make([]consumerPair, len(pairs)-1)
+		i := 0
+		for _, p := range pairs {
+			if p != pair {
+				reducedPairs[i] = pair
+				i++
+			}
+		}
+		if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
+			if !p.in(path, cycles) {
+				cycles = append(cycles, path)
+				Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
+			}
+		}
+	}
+
+	// for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
+	// the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
+	// tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
+	for _, cycle := range cycles {
+		if len(cycle) == 3 {
+			return true
+		}
+	}
+	return false
+}
+
+func (p *partitionMovements) isSticky() bool {
+	for topic, movements := range p.PartitionMovementsByTopic {
+		movementPairs := make([]consumerPair, len(movements))
+		i := 0
+		for pair := range movements {
+			movementPairs[i] = pair
+			i++
+		}
+		if p.hasCycles(movementPairs) {
+			Logger.Printf("Stickiness is violated for topic %s", topic)
+			Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
+			return false
+		}
+	}
+	return true
+}
+
+func indexOfSubList(source []string, target []string) int {
+	targetSize := len(target)
+	maxCandidate := len(source) - targetSize
+nextCand:
+	for candidate := 0; candidate <= maxCandidate; candidate++ {
+		j := candidate
+		for i := 0; i < targetSize; i++ {
+			if target[i] != source[j] {
+				// Element mismatch, try next cand
+				continue nextCand
+			}
+			j++
+		}
+		// All elements of candidate matched target
+		return candidate
+	}
+	return -1
+}

+ 2176 - 0
balance_strategy_test.go

@@ -1,8 +1,13 @@
 package sarama
 
 import (
+	"fmt"
+	"math"
+	"math/rand"
 	"reflect"
+	"sort"
 	"testing"
+	"time"
 )
 
 func TestBalanceStrategyRange(t *testing.T) {
@@ -100,3 +105,2174 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
 		}
 	}
 }
+
+func Test_deserializeTopicPartitionAssignment(t *testing.T) {
+	type args struct {
+		userDataBytes []byte
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    StickyAssignorUserData
+		wantErr bool
+	}{
+		{
+			name: "Nil userdata bytes",
+			args: args{},
+			want: &StickyAssignorUserDataV1{},
+		},
+		{
+			name: "Non-empty invalid userdata bytes",
+			args: args{
+				userDataBytes: []byte{
+					0x00, 0x00,
+					0x00, 0x00, 0x00, 0x01,
+					0x00, 0x03, 'f', 'o', 'o',
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "Valid v0 userdata bytes",
+			args: args{
+				userDataBytes: []byte{
+					0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+					0x33, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
+					0x05,
+				},
+			},
+			want: &StickyAssignorUserDataV0{
+				Topics: map[string][]int32{"t03": {5}},
+				topicPartitions: []topicPartitionAssignment{
+					topicPartitionAssignment{
+						Topic:     "t03",
+						Partition: 5,
+					},
+				},
+			},
+		},
+		{
+			name: "Valid v1 userdata bytes",
+			args: args{
+				userDataBytes: []byte{
+					0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+					0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+					0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
+					0xff,
+				},
+			},
+			want: &StickyAssignorUserDataV1{
+				Topics:     map[string][]int32{"t06": {0, 4}},
+				Generation: -1,
+				topicPartitions: []topicPartitionAssignment{
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 0,
+					},
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 4,
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_prepopulateCurrentAssignments(t *testing.T) {
+	type args struct {
+		members map[string]ConsumerGroupMemberMetadata
+	}
+	tests := []struct {
+		name                   string
+		args                   args
+		wantCurrentAssignments map[string][]topicPartitionAssignment
+		wantPrevAssignments    map[topicPartitionAssignment]consumerGenerationPair
+		wantErr                bool
+	}{
+		{
+			name:                   "Empty map",
+			wantCurrentAssignments: map[string][]topicPartitionAssignment{},
+			wantPrevAssignments:    map[topicPartitionAssignment]consumerGenerationPair{},
+		},
+		{
+			name: "Single consumer",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"c01": ConsumerGroupMemberMetadata{
+						Version: 2,
+						UserData: []byte{
+							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
+							0xff,
+						},
+					},
+				},
+			},
+			wantCurrentAssignments: map[string][]topicPartitionAssignment{
+				"c01": []topicPartitionAssignment{
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 0,
+					},
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 4,
+					},
+				},
+			},
+			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
+		},
+		{
+			name: "Duplicate consumer assignments in metadata",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"c01": ConsumerGroupMemberMetadata{
+						Version: 2,
+						UserData: []byte{
+							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
+							0xff,
+						},
+					},
+					"c02": ConsumerGroupMemberMetadata{
+						Version: 2,
+						UserData: []byte{
+							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
+							0xff,
+						},
+					},
+				},
+			},
+			wantCurrentAssignments: map[string][]topicPartitionAssignment{
+				"c01": []topicPartitionAssignment{
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 0,
+					},
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 4,
+					},
+				},
+			},
+			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
+		},
+		{
+			name: "Different generations (5, 6) of consumer assignments in metadata",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"c01": ConsumerGroupMemberMetadata{
+						Version: 2,
+						UserData: []byte{
+							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+							0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
+							0x05,
+						},
+					},
+					"c02": ConsumerGroupMemberMetadata{
+						Version: 2,
+						UserData: []byte{
+							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
+							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+							0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
+							0x06,
+						},
+					},
+				},
+			},
+			wantCurrentAssignments: map[string][]topicPartitionAssignment{
+				"c01": []topicPartitionAssignment{
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 0,
+					},
+					topicPartitionAssignment{
+						Topic:     "t06",
+						Partition: 4,
+					},
+				},
+			},
+			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{
+				topicPartitionAssignment{
+					Topic:     "t06",
+					Partition: 0,
+				}: consumerGenerationPair{
+					Generation: 5,
+					MemberID:   "c01",
+				},
+				topicPartitionAssignment{
+					Topic:     "t06",
+					Partition: 4,
+				}: consumerGenerationPair{
+					Generation: 5,
+					MemberID:   "c01",
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			_, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
+			}
+
+			if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
+				t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
+			}
+		})
+	}
+}
+
+func Test_areSubscriptionsIdentical(t *testing.T) {
+	type args struct {
+		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
+		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
+	}
+	tests := []struct {
+		name string
+		args args
+		want bool
+	}{
+		{
+			name: "Empty consumers and partitions",
+			args: args{
+				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
+				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
+			},
+			want: true,
+		},
+		{
+			name: "Topic partitions with identical consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c1", "c2", "c3"},
+				},
+				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
+			},
+			want: true,
+		},
+		{
+			name: "Topic partitions with mixed up consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3", "c1", "c2"},
+				},
+				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
+			},
+			want: true,
+		},
+		{
+			name: "Topic partitions with different consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"cX", "c1", "c2"},
+				},
+				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
+			},
+			want: false,
+		},
+		{
+			name: "Topic partitions with different number of consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c1", "c2"},
+				},
+				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
+			},
+			want: false,
+		},
+		{
+			name: "Consumers with identical topic partitions",
+			args: args{
+				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+			},
+			want: true,
+		},
+		{
+			name: "Consumer2 with mixed up consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}, topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 2}, topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+			},
+			want: true,
+		},
+		{
+			name: "Consumer2 with different consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}, topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "tX", Partition: 2}, topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+			},
+			want: false,
+		},
+		{
+			name: "Consumer2 with different number of consumer entries",
+			args: args{
+				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}, topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+			},
+			want: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want {
+				t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_sortMemberIDsByPartitionAssignments(t *testing.T) {
+	type args struct {
+		assignments map[string][]topicPartitionAssignment
+	}
+	tests := []struct {
+		name string
+		args args
+		want []string
+	}{
+		{
+			name: "Null assignments",
+			want: make([]string, 0),
+		},
+		{
+			name: "Single assignment",
+			args: args{
+				assignments: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+				},
+			},
+			want: []string{"c1"},
+		},
+		{
+			name: "Multiple assignments with different partition counts",
+			args: args{
+				assignments: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+					"c3": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 3},
+						topicPartitionAssignment{Topic: "t1", Partition: 4},
+						topicPartitionAssignment{Topic: "t1", Partition: 5},
+					},
+				},
+			},
+			want: []string{"c1", "c2", "c3"},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_sortPartitions(t *testing.T) {
+	type args struct {
+		currentAssignment                          map[string][]topicPartitionAssignment
+		partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair
+		isFreshAssignment                          bool
+		partition2AllPotentialConsumers            map[topicPartitionAssignment][]string
+		consumer2AllPotentialPartitions            map[string][]topicPartitionAssignment
+	}
+	tests := []struct {
+		name string
+		args args
+		want []topicPartitionAssignment
+	}{
+		{
+			name: "Empty everything",
+			want: make([]topicPartitionAssignment, 0),
+		},
+		{
+			name: "Base case",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3", "c1", "c2"},
+				},
+			},
+		},
+		{
+			name: "Partitions assigned to a different consumer last time",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3", "c1", "c2"},
+				},
+				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: consumerGenerationPair{Generation: 1, MemberID: "c2"},
+				},
+			},
+		},
+		{
+			name: "Partitions assigned to a different consumer last time",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3", "c1", "c2"},
+				},
+				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: consumerGenerationPair{Generation: 1, MemberID: "c2"},
+				},
+			},
+		},
+		{
+			name: "Fresh assignment",
+			args: args{
+				isFreshAssignment: true,
+				currentAssignment: map[string][]topicPartitionAssignment{},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}, topicPartitionAssignment{Topic: "t1", Partition: 1}, topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2", "c3"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2", "c3", "c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3", "c1", "c2"},
+				},
+				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: consumerGenerationPair{Generation: 1, MemberID: "c2"},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions)
+			if tt.want != nil && !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("sortPartitions() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_filterAssignedPartitions(t *testing.T) {
+	type args struct {
+		currentAssignment               map[string][]topicPartitionAssignment
+		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
+	}
+	tests := []struct {
+		name string
+		args args
+		want map[string][]topicPartitionAssignment
+	}{
+		{
+			name: "All partitions accounted for",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c2"},
+				},
+			},
+			want: map[string][]topicPartitionAssignment{
+				"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+				"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+			},
+		},
+		{
+			name: "One consumer using an unrecognized partition",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1"},
+				},
+			},
+			want: map[string][]topicPartitionAssignment{
+				"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+				"c2": []topicPartitionAssignment{},
+			},
+		},
+		{
+			name: "Interleaved consumer removal",
+			args: args{
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+					"c2": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 1}},
+					"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 2}},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c3"},
+				},
+			},
+			want: map[string][]topicPartitionAssignment{
+				"c1": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 0}},
+				"c2": []topicPartitionAssignment{},
+				"c3": []topicPartitionAssignment{topicPartitionAssignment{Topic: "t1", Partition: 2}},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_canConsumerParticipateInReassignment(t *testing.T) {
+	type args struct {
+		memberID                        string
+		currentAssignment               map[string][]topicPartitionAssignment
+		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
+		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
+	}
+	tests := []struct {
+		name string
+		args args
+		want bool
+	}{
+		{
+			name: "Consumer has been assigned partitions not available to it",
+			args: args{
+				memberID: "c1",
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+					"c2": []topicPartitionAssignment{},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1", "c2"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c1", "c2"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c2"},
+				},
+			},
+			want: true,
+		},
+		{
+			name: "Consumer has been assigned all available partitions",
+			args: args{
+				memberID: "c1",
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c1"},
+				},
+			},
+			want: false,
+		},
+		{
+			name: "Consumer has not been assigned all available partitions",
+			args: args{
+				memberID: "c1",
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+				},
+				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: []string{"c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: []string{"c1"},
+					topicPartitionAssignment{Topic: "t1", Partition: 2}: []string{"c1"},
+				},
+			},
+			want: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want {
+				t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) {
+	type args struct {
+		assignments []topicPartitionAssignment
+		topic       topicPartitionAssignment
+	}
+	tests := []struct {
+		name string
+		args args
+		want []topicPartitionAssignment
+	}{
+		{
+			name: "Empty",
+			args: args{
+				assignments: make([]topicPartitionAssignment, 0),
+				topic:       topicPartitionAssignment{Topic: "t1", Partition: 0},
+			},
+			want: make([]topicPartitionAssignment, 0),
+		},
+		{
+			name: "Remove first entry",
+			args: args{
+				assignments: []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 0},
+					topicPartitionAssignment{Topic: "t1", Partition: 1},
+					topicPartitionAssignment{Topic: "t1", Partition: 2},
+				},
+				topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{Topic: "t1", Partition: 1},
+				topicPartitionAssignment{Topic: "t1", Partition: 2},
+			},
+		},
+		{
+			name: "Remove middle entry",
+			args: args{
+				assignments: []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 0},
+					topicPartitionAssignment{Topic: "t1", Partition: 1},
+					topicPartitionAssignment{Topic: "t1", Partition: 2},
+				},
+				topic: topicPartitionAssignment{Topic: "t1", Partition: 1},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{Topic: "t1", Partition: 0},
+				topicPartitionAssignment{Topic: "t1", Partition: 2},
+			},
+		},
+		{
+			name: "Remove last entry",
+			args: args{
+				assignments: []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 0},
+					topicPartitionAssignment{Topic: "t1", Partition: 1},
+					topicPartitionAssignment{Topic: "t1", Partition: 2},
+				},
+				topic: topicPartitionAssignment{Topic: "t1", Partition: 2},
+			},
+			want: []topicPartitionAssignment{
+				topicPartitionAssignment{Topic: "t1", Partition: 0},
+				topicPartitionAssignment{Topic: "t1", Partition: 1},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_removeIndexFromStringSlice(t *testing.T) {
+	type args struct {
+		s []string
+		i int
+	}
+	tests := []struct {
+		name string
+		args args
+		want []string
+	}{
+		{
+			name: "Empty slice",
+			args: args{
+				s: make([]string, 0),
+				i: 0,
+			},
+			want: make([]string, 0),
+		},
+		{
+			name: "Slice with single entry",
+			args: args{
+				s: []string{"foo"},
+				i: 0,
+			},
+			want: make([]string, 0),
+		},
+		{
+			name: "Slice with multiple entries",
+			args: args{
+				s: []string{"a", "b", "c"},
+				i: 0,
+			},
+			want: []string{"b", "c"},
+		},
+		{
+			name: "Slice with multiple entries and index is in the middle",
+			args: args{
+				s: []string{"a", "b", "c"},
+				i: 1,
+			},
+			want: []string{"a", "c"},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := removeIndexFromStringSlice(tt.args.s, tt.args.i); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("removeIndexFromSlice() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_removeValueFromStringSlice(t *testing.T) {
+	type args struct {
+		s []string
+		e string
+	}
+	tests := []struct {
+		name string
+		args args
+		want []string
+	}{
+		{
+			name: "Empty input slice",
+			args: args{
+				s: []string{},
+				e: "",
+			},
+			want: []string{},
+		},
+		{
+			name: "Input slice with one entry that doesn't match",
+			args: args{
+				s: []string{"a"},
+				e: "b",
+			},
+			want: []string{"a"},
+		},
+		{
+			name: "Input slice with multiple entries and a positive match",
+			args: args{
+				s: []string{"a", "b", "c"},
+				e: "b",
+			},
+			want: []string{"a", "c"},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := removeValueFromStringSlice(tt.args.s, tt.args.e); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("removeValueFromSlice() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_assignPartition(t *testing.T) {
+	type args struct {
+		partition                       topicPartitionAssignment
+		sortedCurrentSubscriptions      []string
+		currentAssignment               map[string][]topicPartitionAssignment
+		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
+		currentPartitionConsumer        map[topicPartitionAssignment]string
+	}
+	tests := []struct {
+		name                         string
+		args                         args
+		want                         []string
+		wantCurrentAssignment        map[string][]topicPartitionAssignment
+		wantCurrentPartitionConsumer map[topicPartitionAssignment]string
+	}{
+		{
+			name: "Base",
+			args: args{
+				partition:                  topicPartitionAssignment{Topic: "t1", Partition: 2},
+				sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+					"c3": []topicPartitionAssignment{},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+					"c3": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+				},
+				currentPartitionConsumer: map[topicPartitionAssignment]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: "c1",
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: "c2",
+				},
+			},
+			want: []string{"c1", "c2", "c3"},
+			wantCurrentAssignment: map[string][]topicPartitionAssignment{
+				"c1": []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 0},
+				},
+				"c2": []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 1},
+				},
+				"c3": []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 2},
+				},
+			},
+			wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
+				topicPartitionAssignment{Topic: "t1", Partition: 0}: "c1",
+				topicPartitionAssignment{Topic: "t1", Partition: 1}: "c2",
+				topicPartitionAssignment{Topic: "t1", Partition: 2}: "c3",
+			},
+		},
+		{
+			name: "Unassignable Partition",
+			args: args{
+				partition:                  topicPartitionAssignment{Topic: "t1", Partition: 3},
+				sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
+				currentAssignment: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+					"c3": []topicPartitionAssignment{},
+				},
+				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
+					"c1": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 0},
+					},
+					"c2": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 1},
+					},
+					"c3": []topicPartitionAssignment{
+						topicPartitionAssignment{Topic: "t1", Partition: 2},
+					},
+				},
+				currentPartitionConsumer: map[topicPartitionAssignment]string{
+					topicPartitionAssignment{Topic: "t1", Partition: 0}: "c1",
+					topicPartitionAssignment{Topic: "t1", Partition: 1}: "c2",
+				},
+			},
+			want: []string{"c3", "c1", "c2"},
+			wantCurrentAssignment: map[string][]topicPartitionAssignment{
+				"c1": []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 0},
+				},
+				"c2": []topicPartitionAssignment{
+					topicPartitionAssignment{Topic: "t1", Partition: 1},
+				},
+				"c3": []topicPartitionAssignment{},
+			},
+			wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
+				topicPartitionAssignment{Topic: "t1", Partition: 0}: "c1",
+				topicPartitionAssignment{Topic: "t1", Partition: 1}: "c2",
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("assignPartition() = %v, want %v", got, tt.want)
+			}
+			if !reflect.DeepEqual(tt.args.currentAssignment, tt.wantCurrentAssignment) {
+				t.Errorf("assignPartition() currentAssignment = %v, want %v", tt.args.currentAssignment, tt.wantCurrentAssignment)
+			}
+			if !reflect.DeepEqual(tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer) {
+				t.Errorf("assignPartition() currentPartitionConsumer = %v, want %v", tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer)
+			}
+		})
+	}
+}
+
+func Test_stickyBalanceStrategy_Plan(t *testing.T) {
+	type args struct {
+		members map[string]ConsumerGroupMemberMetadata
+		topics  map[string][]int32
+	}
+	tests := []struct {
+		name string
+		s    *stickyBalanceStrategy
+		args args
+	}{
+		{
+			name: "One consumer with no topics",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer": ConsumerGroupMemberMetadata{},
+				},
+				topics: make(map[string][]int32),
+			},
+		},
+		{
+			name: "One consumer with non-existent topic",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic": make([]int32, 0),
+				},
+			},
+		},
+		{
+			name: "One consumer with one topic",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic": []int32{0, 1, 2},
+				},
+			},
+		},
+		{
+			name: "Only assigns partitions from subscribed topics",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic": []int32{0, 1, 2},
+					"other": []int32{0, 1, 2},
+				},
+			},
+		},
+		{
+			name: "One consumer with multiple topics",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1", "topic2"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0},
+					"topic2": []int32{0, 1},
+				},
+			},
+		},
+		{
+			name: "Two consumers with one topic and one partition",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic": []int32{0},
+				},
+			},
+		},
+		{
+			name: "Two consumers with one topic and two partitions",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic": []int32{0, 1},
+				},
+			},
+		},
+		{
+			name: "Multiple consumers with mixed topic subscriptions",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1"},
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1", "topic2"},
+					},
+					"consumer3": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0, 1, 2},
+					"topic2": []int32{0, 1},
+				},
+			},
+		},
+		{
+			name: "Two consumers with two topics and six partitions",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1", "topic2"},
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1", "topic2"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0, 1, 2},
+					"topic2": []int32{0, 1, 2},
+				},
+			},
+		},
+		{
+			name: "Three consumers (two old, one new) with one topic and twelve partitions",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic1"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{4, 11, 8, 5, 9, 2}}, 1),
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic1"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{1, 3, 0, 7, 10, 6}}, 1),
+					},
+					"consumer3": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
+				},
+			},
+		},
+		{
+			name: "Three consumers (two old, one new) with one topic and 13 partitions",
+			args: args{
+				members: map[string]ConsumerGroupMemberMetadata{
+					"consumer1": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic1"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{4, 11, 8, 5, 9, 2, 6}}, 1),
+					},
+					"consumer2": ConsumerGroupMemberMetadata{
+						Topics:   []string{"topic1"},
+						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{1, 3, 0, 7, 10, 12}}, 1),
+					},
+					"consumer3": ConsumerGroupMemberMetadata{
+						Topics: []string{"topic1"},
+					},
+				},
+				topics: map[string][]int32{
+					"topic1": []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			s := &stickyBalanceStrategy{}
+			plan, err := s.Plan(tt.args.members, tt.args.topics)
+			verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err)
+			verifyFullyBalanced(t, plan)
+		})
+	}
+}
+
+func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
+		},
+		"consumer2": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
+		},
+		"consumer3": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
+		},
+	}
+	topics := map[string][]int32{
+		"topic1": []int32{0, 1},
+		"topic2": []int32{0, 1},
+		"topic3": []int32{0, 1},
+		"topic4": []int32{0, 1},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+	verifyFullyBalanced(t, plan1)
+
+	// PLAN 2
+	delete(members, "consumer1")
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
+		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
+		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
+	}
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+}
+
+func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1"},
+		},
+		"consumer2": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2"},
+		},
+		"consumer3": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3"},
+		},
+	}
+	topics := map[string][]int32{
+		"topic1": []int32{0},
+		"topic2": []int32{0, 1},
+		"topic3": []int32{0, 1, 2},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+	if len(plan1["consumer1"]["topic1"]) != 1 || len(plan1["consumer2"]["topic2"]) != 2 || len(plan1["consumer3"]["topic3"]) != 3 {
+		t.Error("Incorrect distribution of topic partition assignments")
+	}
+
+	// PLAN 2
+	delete(members, "consumer1")
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   members["consumer2"].Topics,
+		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   members["consumer3"].Topics,
+		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
+	}
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+	if len(plan2["consumer2"]["topic1"]) != 1 || len(plan2["consumer2"]["topic2"]) != 2 || len(plan2["consumer3"]["topic3"]) != 3 {
+		t.Error("Incorrect distribution of topic partition assignments")
+	}
+}
+
+func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+	topicNames := []string{"topic1", "topic2"}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: topicNames,
+		},
+		"consumer2": ConsumerGroupMemberMetadata{
+			Topics: topicNames,
+		},
+	}
+	topics := map[string][]int32{
+		"topic1": []int32{0, 1},
+		"topic2": []int32{0, 1},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// PLAN 2
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics: topicNames,
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   topicNames,
+		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   topicNames,
+		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
+	}
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+}
+
+func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic"},
+		},
+	}
+	topics := map[string][]int32{
+		"topic": []int32{0, 1, 2},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// PLAN 2
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic"},
+		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic"},
+	}
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+
+	// PLAN 3
+	delete(members, "consumer1")
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic"},
+		UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
+	}
+	plan3, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
+		},
+		"consumer2": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic3", "topic5"},
+		},
+		"consumer3": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic3", "topic5"},
+		},
+		"consumer4": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
+		},
+	}
+	topics := make(map[string][]int32, 5)
+	for i := 1; i <= 5; i++ {
+		partitions := make([]int32, i%2+1)
+		for j := 0; j < i%2+1; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1"},
+		},
+		"consumer2": ConsumerGroupMemberMetadata{
+			Topics: []string{"topic1"},
+		},
+	}
+	topics := map[string][]int32{
+		"topic1": []int32{0, 1, 2},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+	verifyFullyBalanced(t, plan1)
+
+	// PLAN 2
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2"},
+		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2"},
+		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
+	}
+	topics["topic2"] = []int32{0, 1, 2}
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+
+	// PLAN 3
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2"},
+		UserData: encodeSubscriberPlan(t, plan2["consumer1"]),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2"},
+		UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
+	}
+	delete(topics, "topic1")
+
+	plan3, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
+	verifyFullyBalanced(t, plan3)
+}
+
+func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 20; i++ {
+		topics := make([]string, 20)
+		for j := 0; j < 20; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 20)
+	for i := 0; i < 20; i++ {
+		partitions := make([]int32, 20)
+		for j := 0; j < 20; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	for i := 0; i < 20; i++ {
+		topics := make([]string, 20)
+		for j := 0; j < 20; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
+			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
+		}
+	}
+	delete(members, "consumer10")
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := make(map[string]ConsumerGroupMemberMetadata)
+	for i := 0; i < 10; i++ {
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
+	}
+	partitions := make([]int32, 20)
+	for j := 0; j < 20; j++ {
+		partitions[j] = int32(j)
+	}
+	topics := map[string][]int32{"topic1": partitions}
+
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// add a new consumer
+	members["consumer10"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	// PLAN 1
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 9; i++ {
+		topics := make([]string, 15)
+		for j := 0; j < 15; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 15)
+	for i := 0; i < 15; i++ {
+		partitions := make([]int32, i)
+		for j := 0; j < i; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// PLAN 2
+	for i := 0; i < 9; i++ {
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
+			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
+		}
+	}
+	delete(members, "consumer5")
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+	r := rand.New(rand.NewSource(time.Now().UnixNano()))
+
+	// PLAN 1
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 200; i++ {
+		topics := make([]string, 200)
+		for j := 0; j < 200; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 40)
+	for i := 0; i < 40; i++ {
+		partitionCount := r.Intn(20)
+		partitions := make([]int32, partitionCount)
+		for j := 0; j < partitionCount; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	for i := 0; i < 200; i++ {
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
+			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
+		}
+	}
+	for i := 0; i < 50; i++ {
+		delete(members, fmt.Sprintf("consumer%d", i))
+	}
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 3; i++ {
+		topics := make([]string, 0)
+		for j := i; j <= 3*i-2; j++ {
+			topics = append(topics, fmt.Sprintf("topic%d", j))
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 5)
+	for i := 1; i < 5; i++ {
+		topics[fmt.Sprintf("topic%d", i)] = []int32{0}
+	}
+
+	plan1, err := s.Plan(members, topics)
+	if err != nil {
+		t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
+		return
+	}
+	verifyValidityAndBalance(t, members, plan1)
+
+	members["consumer0"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) {
+	r := rand.New(rand.NewSource(time.Now().UnixNano()))
+
+	minNumConsumers := 20
+	maxNumConsumers := 40
+	minNumTopics := 10
+	maxNumTopics := 20
+
+	for round := 0; round < 100; round++ {
+		numTopics := minNumTopics + r.Intn(maxNumTopics-minNumTopics)
+		topics := make([]string, numTopics)
+		partitionsPerTopic := make(map[string][]int32, numTopics)
+		for i := 0; i < numTopics; i++ {
+			topicName := fmt.Sprintf("topic%d", i)
+			topics[i] = topicName
+			partitions := make([]int32, maxNumTopics)
+			for j := 0; j < maxNumTopics; j++ {
+				partitions[j] = int32(j)
+			}
+			partitionsPerTopic[topicName] = partitions
+		}
+
+		numConsumers := minNumConsumers + r.Intn(maxNumConsumers-minNumConsumers)
+		members := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
+		for i := 0; i < numConsumers; i++ {
+			sub := getRandomSublist(r, topics)
+			sort.Strings(sub)
+			members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: sub}
+		}
+
+		s := &stickyBalanceStrategy{}
+		plan, err := s.Plan(members, partitionsPerTopic)
+		verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+
+		// PLAN 2
+		membersPlan2 := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
+		for i := 0; i < numConsumers; i++ {
+			sub := getRandomSublist(r, topics)
+			sort.Strings(sub)
+			membersPlan2[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+				Topics:   sub,
+				UserData: encodeSubscriberPlan(t, plan[fmt.Sprintf("consumer%d", i)]),
+			}
+		}
+		plan2, err := s.Plan(membersPlan2, partitionsPerTopic)
+		verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	}
+}
+
+func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := make(map[string][]int32, 6)
+	for i := 1; i <= 6; i++ {
+		topics[fmt.Sprintf("topic%d", i)] = []int32{0}
+	}
+	members := make(map[string]ConsumerGroupMemberMetadata, 3)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2"},
+		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic1": []int32{0}}),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
+		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic2": []int32{0}, "topic3": []int32{0}}),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic2", "topic3", "topic4", "topic5", "topic6"},
+		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic4": []int32{0}, "topic5": []int32{0}, "topic6": []int32{0}}),
+	}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2}}
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer2": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer3": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer4": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+	}
+
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// PLAN 2
+	// remove the potential group leader
+	delete(members, "consumer1")
+	for i := 2; i <= 4; i++ {
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+			Topics:   []string{"topic1"},
+			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
+		}
+	}
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := make(map[string][]int32, 2)
+	topics["topic1"] = []int32{0}
+	topics["topic3"] = make([]int32, 100)
+	for i := 0; i < 100; i++ {
+		topics["topic3"][i] = int32(i)
+	}
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{Topics: []string{"topic1", "topic2", "topic3"}},
+	}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+	verifyFullyBalanced(t, plan)
+	if (len(plan["consumer1"]["topic1"]) + len(plan["consumer1"]["topic3"])) != 101 {
+		t.Error("Incorrect number of partitions assigned")
+		return
+	}
+}
+
+func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2}}
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+
+	// PLAN 2
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   members["consumer1"].Topics,
+		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
+	}
+
+	plan2, err := s.Plan(members, map[string][]int32{})
+	if len(plan2) != 1 {
+		t.Error("Incorrect number of consumers")
+		return
+	}
+	if len(plan2["consumer1"]) != 0 {
+		t.Error("Incorrect number of consumer topic assignments")
+		return
+	}
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+}
+
+func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2, 3, 4, 5}}
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer2": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer3": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+	verifyFullyBalanced(t, plan1)
+
+	// PLAN 2
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
+	}
+	delete(members, "consumer3")
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+	if len(intersection(plan1["consumer1"]["topic1"], plan2["consumer1"]["topic1"])) != 2 {
+		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
+	}
+	if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
+		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
+	}
+
+	// PLAN 3
+	delete(members, "consumer1")
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
+	}
+
+	plan3, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
+	verifyFullyBalanced(t, plan3)
+}
+
+func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2, 3, 4, 5}}
+	members := map[string]ConsumerGroupMemberMetadata{
+		"consumer1": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer2": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+		"consumer3": ConsumerGroupMemberMetadata{Topics: []string{"topic1"}},
+	}
+	plan1, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
+	verifyFullyBalanced(t, plan1)
+
+	// PLAN 2
+	delete(members, "consumer1")
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
+	}
+	delete(members, "consumer3")
+
+	plan2, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
+	verifyFullyBalanced(t, plan2)
+	if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
+		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
+	}
+
+	// PLAN 3
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
+	}
+	plan3, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
+	verifyFullyBalanced(t, plan3)
+}
+func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2, 3, 4, 5}}
+	members := make(map[string]ConsumerGroupMemberMetadata, 3)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0, 1, 4}}, 1),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0, 2, 3}}, 1),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{3, 4, 5}}, 2),
+	}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+	verifyFullyBalanced(t, plan)
+}
+
+func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1, 2}}
+	members := make(map[string]ConsumerGroupMemberMetadata, 3)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0, 2}}, 1),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithOldSchema(t, map[string][]int32{"topic1": []int32{1}}),
+	}
+	members["consumer3"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+	verifyFullyBalanced(t, plan)
+}
+
+func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) {
+	s := &stickyBalanceStrategy{}
+
+	topics := map[string][]int32{"topic1": []int32{0, 1}}
+	members := make(map[string]ConsumerGroupMemberMetadata, 2)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0, 1}}, 1),
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics:   []string{"topic1"},
+		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": []int32{0, 1}}, 1),
+	}
+
+	plan, err := s.Plan(members, topics)
+	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
+	verifyFullyBalanced(t, plan)
+}
+
+func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
+	s := &stickyBalanceStrategy{}
+	r := rand.New(rand.NewSource(time.Now().UnixNano()))
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 200; i++ {
+		topics := make([]string, 200)
+		for j := 0; j < 200; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 40)
+	for i := 0; i < 40; i++ {
+		partitionCount := r.Intn(20)
+		partitions := make([]int32, partitionCount)
+		for j := 0; j < partitionCount; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+	b.ResetTimer()
+
+	for n := 0; n < b.N; n++ {
+		if _, err := s.Plan(members, topics); err != nil {
+			b.Errorf("Error building plan in benchmark: %v", err)
+		}
+	}
+}
+
+func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssignments(b *testing.B) {
+	s := &stickyBalanceStrategy{}
+	r := rand.New(rand.NewSource(time.Now().UnixNano()))
+	members := make(map[string]ConsumerGroupMemberMetadata, 20)
+	for i := 0; i < 200; i++ {
+		topics := make([]string, 200)
+		for j := 0; j < 200; j++ {
+			topics[j] = fmt.Sprintf("topic%d", j)
+		}
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
+	}
+	topics := make(map[string][]int32, 40)
+	for i := 0; i < 40; i++ {
+		partitionCount := r.Intn(20)
+		partitions := make([]int32, partitionCount)
+		for j := 0; j < partitionCount; j++ {
+			partitions[j] = int32(j)
+		}
+		topics[fmt.Sprintf("topic%d", i)] = partitions
+	}
+	plan, _ := s.Plan(members, topics)
+
+	for i := 0; i < 200; i++ {
+		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
+			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
+			UserData: encodeSubscriberPlanWithGenerationForBenchmark(b, plan[fmt.Sprintf("consumer%d", i)], 1),
+		}
+	}
+	for i := 0; i < 1; i++ {
+		delete(members, fmt.Sprintf("consumer%d", i))
+	}
+	b.ResetTimer()
+
+	for n := 0; n < b.N; n++ {
+		if _, err := s.Plan(members, topics); err != nil {
+			b.Errorf("Error building plan in benchmark: %v", err)
+		}
+	}
+}
+
+func verifyPlanIsBalancedAndSticky(t *testing.T, s *stickyBalanceStrategy, members map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan, err error) {
+	if err != nil {
+		t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
+		return
+	}
+	if !s.movements.isSticky() {
+		t.Error("stickyBalanceStrategy.Plan() not sticky")
+		return
+	}
+	verifyValidityAndBalance(t, members, plan)
+}
+
+func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan) {
+	size := len(consumers)
+	if size != len(plan) {
+		t.Errorf("Subscription size (%d) not equal to plan size (%d)", size, len(plan))
+		t.FailNow()
+	}
+
+	members := make([]string, size)
+	i := 0
+	for memberID := range consumers {
+		members[i] = memberID
+		i++
+	}
+	sort.Strings(members)
+
+	for i, memberID := range members {
+		for assignedTopic := range plan[memberID] {
+			found := false
+			for _, assignableTopic := range consumers[memberID].Topics {
+				if assignableTopic == assignableTopic {
+					found = true
+					break
+				}
+			}
+			if !found {
+				t.Errorf("Consumer %s had assigned topic %s that wasn't in the list of assignable topics", memberID, assignedTopic)
+				t.FailNow()
+			}
+		}
+
+		// skip last consumer
+		if i == len(members)-1 {
+			continue
+		}
+
+		consumerAssignments := make([]topicPartitionAssignment, 0)
+		for topic, partitions := range plan[memberID] {
+			for _, partition := range partitions {
+				consumerAssignments = append(consumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
+			}
+		}
+
+		for j := i + 1; j < size; j++ {
+			otherConsumer := members[j]
+			otherConsumerAssignments := make([]topicPartitionAssignment, 0)
+			for topic, partitions := range plan[otherConsumer] {
+				for _, partition := range partitions {
+					otherConsumerAssignments = append(otherConsumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
+				}
+			}
+			assignmentsIntersection := intersection(consumerAssignments, otherConsumerAssignments)
+			if len(assignmentsIntersection) > 0 {
+				t.Errorf("Consumers %s and %s have common partitions assigned to them: %v", memberID, otherConsumer, assignmentsIntersection)
+				t.FailNow()
+			}
+
+			if math.Abs(float64(len(consumerAssignments)-len(otherConsumerAssignments))) <= 1 {
+				continue
+			}
+
+			if len(consumerAssignments) > len(otherConsumerAssignments) {
+				for _, topic := range consumerAssignments {
+					if _, exists := plan[otherConsumer][topic.Topic]; exists {
+						t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", otherConsumer, memberID, memberID, len(consumerAssignments), otherConsumer, len(otherConsumerAssignments))
+						t.FailNow()
+					}
+				}
+			}
+
+			if len(otherConsumerAssignments) > len(consumerAssignments) {
+				for _, topic := range otherConsumerAssignments {
+					if _, exists := plan[memberID][topic.Topic]; exists {
+						t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", memberID, otherConsumer, otherConsumer, len(otherConsumerAssignments), memberID, len(consumerAssignments))
+						t.FailNow()
+					}
+				}
+			}
+		}
+	}
+}
+
+// Produces the intersection of two slices
+// From https://github.com/juliangruber/go-intersect
+func intersection(a interface{}, b interface{}) []interface{} {
+	set := make([]interface{}, 0)
+	hash := make(map[interface{}]bool)
+	av := reflect.ValueOf(a)
+	bv := reflect.ValueOf(b)
+
+	for i := 0; i < av.Len(); i++ {
+		el := av.Index(i).Interface()
+		hash[el] = true
+	}
+
+	for i := 0; i < bv.Len(); i++ {
+		el := bv.Index(i).Interface()
+		if _, found := hash[el]; found {
+			set = append(set, el)
+		}
+	}
+
+	return set
+}
+
+func encodeSubscriberPlan(t *testing.T, assignments map[string][]int32) []byte {
+	return encodeSubscriberPlanWithGeneration(t, assignments, defaultGeneration)
+}
+
+func encodeSubscriberPlanWithGeneration(t *testing.T, assignments map[string][]int32, generation int32) []byte {
+	userDataBytes, err := encode(&StickyAssignorUserDataV1{
+		Topics:     assignments,
+		Generation: generation,
+	}, nil)
+	if err != nil {
+		t.Errorf("encodeSubscriberPlan error = %v", err)
+		t.FailNow()
+	}
+	return userDataBytes
+}
+
+func encodeSubscriberPlanWithGenerationForBenchmark(b *testing.B, assignments map[string][]int32, generation int32) []byte {
+	userDataBytes, err := encode(&StickyAssignorUserDataV1{
+		Topics:     assignments,
+		Generation: generation,
+	}, nil)
+	if err != nil {
+		b.Errorf("encodeSubscriberPlan error = %v", err)
+		b.FailNow()
+	}
+	return userDataBytes
+}
+
+func encodeSubscriberPlanWithOldSchema(t *testing.T, assignments map[string][]int32) []byte {
+	userDataBytes, err := encode(&StickyAssignorUserDataV0{
+		Topics: assignments,
+	}, nil)
+	if err != nil {
+		t.Errorf("encodeSubscriberPlan error = %v", err)
+		t.FailNow()
+	}
+	return userDataBytes
+}
+
+// verify that the plan is fully balanced, assumes that all consumers can
+// consume from the same set of topics
+func verifyFullyBalanced(t *testing.T, plan BalanceStrategyPlan) {
+	min := math.MaxInt32
+	max := math.MinInt32
+	for _, topics := range plan {
+		assignedPartitionsCount := 0
+		for _, partitions := range topics {
+			assignedPartitionsCount += len(partitions)
+		}
+		if assignedPartitionsCount < min {
+			min = assignedPartitionsCount
+		}
+		if assignedPartitionsCount > max {
+			max = assignedPartitionsCount
+		}
+	}
+	if (max - min) > 1 {
+		t.Errorf("Plan partition assignment is not fully balanced: min=%d, max=%d", min, max)
+	}
+}
+
+func getRandomSublist(r *rand.Rand, s []string) []string {
+	howManyToRemove := r.Intn(len(s))
+	allEntriesMap := make(map[int]string)
+	for i, s := range s {
+		allEntriesMap[i] = s
+	}
+	for i := 0; i < howManyToRemove; i++ {
+		delete(allEntriesMap, r.Intn(len(allEntriesMap)))
+	}
+
+	subList := make([]string, len(allEntriesMap))
+	i := 0
+	for _, s := range allEntriesMap {
+		subList[i] = s
+		i++
+	}
+	return subList
+}

+ 23 - 5
consumer_group.go

@@ -63,6 +63,8 @@ type consumerGroup struct {
 	lock      sync.Mutex
 	closed    chan none
 	closeOnce sync.Once
+
+	userData []byte
 }
 
 // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
@@ -282,6 +284,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
 			return nil, err
 		}
 		claims = members.Topics
+		c.userData = members.UserData
 
 		for _, partitions := range claims {
 			sort.Sort(int32Slice(partitions))
@@ -303,9 +306,14 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
 		req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
 	}
 
+	// use static user-data if configured, otherwise use consumer-group userdata from the last sync
+	userData := c.config.Consumer.Group.Member.UserData
+	if len(userData) == 0 {
+		userData = c.userData
+	}
 	meta := &ConsumerGroupMemberMetadata{
 		Topics:   topics,
-		UserData: c.config.Consumer.Group.Member.UserData,
+		UserData: userData,
 	}
 	strategy := c.config.Consumer.Group.Rebalance.Strategy
 	if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
@@ -322,10 +330,20 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
 		GenerationId: generationID,
 	}
 	for memberID, topics := range plan {
-		err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
-			Topics: topics,
-		})
-		if err != nil {
+		assignment := &ConsumerGroupMemberAssignment{Topics: topics}
+
+		// Include topic assignments in group-assignment userdata for each consumer-group member
+		if c.config.Consumer.Group.Rebalance.Strategy == BalanceStrategySticky {
+			userDataBytes, err := encode(&StickyAssignorUserDataV1{
+				Topics:     topics,
+				Generation: generationID,
+			}, nil)
+			if err != nil {
+				return nil, err
+			}
+			assignment.UserData = userDataBytes
+		}
+		if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
 			return nil, err
 		}
 	}

+ 19 - 6
examples/consumergroup/main.go

@@ -15,12 +15,13 @@ import (
 
 // Sarma configuration options
 var (
-	brokers = ""
-	version = ""
-	group   = ""
-	topics  = ""
-	oldest  = true
-	verbose = false
+	brokers  = ""
+	version  = ""
+	group    = ""
+	topics   = ""
+	assignor = ""
+	oldest   = true
+	verbose  = false
 )
 
 func init() {
@@ -28,6 +29,7 @@ func init() {
 	flag.StringVar(&group, "group", "", "Kafka consumer group definition")
 	flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
 	flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
+	flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
 	flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest")
 	flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
 	flag.Parse()
@@ -64,6 +66,17 @@ func main() {
 	config := sarama.NewConfig()
 	config.Version = version
 
+	switch assignor {
+	case "sticky":
+		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
+	case "roundrobin":
+		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
+	case "range":
+		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
+	default:
+		log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
+	}
+
 	if oldest {
 		config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	}

+ 124 - 0
sticky_assignor_user_data.go

@@ -0,0 +1,124 @@
+package sarama
+
+type topicPartitionAssignment struct {
+	Topic     string
+	Partition int32
+}
+
+type StickyAssignorUserData interface {
+	partitions() []topicPartitionAssignment
+	hasGeneration() bool
+	generation() int
+}
+
+//StickyAssignorUserDataV0 holds topic partition information for an assignment
+type StickyAssignorUserDataV0 struct {
+	Topics map[string][]int32
+
+	topicPartitions []topicPartitionAssignment
+}
+
+func (m *StickyAssignorUserDataV0) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(m.Topics)); err != nil {
+		return err
+	}
+
+	for topic, partitions := range m.Topics {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putInt32Array(partitions); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (m *StickyAssignorUserDataV0) decode(pd packetDecoder) (err error) {
+	var topicLen int
+	if topicLen, err = pd.getArrayLength(); err != nil {
+		return
+	}
+
+	m.Topics = make(map[string][]int32, topicLen)
+	for i := 0; i < topicLen; i++ {
+		var topic string
+		if topic, err = pd.getString(); err != nil {
+			return
+		}
+		if m.Topics[topic], err = pd.getInt32Array(); err != nil {
+			return
+		}
+	}
+	m.topicPartitions = populateTopicPartitions(m.Topics)
+	return nil
+}
+
+func (m *StickyAssignorUserDataV0) partitions() []topicPartitionAssignment { return m.topicPartitions }
+func (m *StickyAssignorUserDataV0) hasGeneration() bool                    { return false }
+func (m *StickyAssignorUserDataV0) generation() int                        { return defaultGeneration }
+
+//StickyAssignorUserDataV1 holds topic partition information for an assignment
+type StickyAssignorUserDataV1 struct {
+	Topics     map[string][]int32
+	Generation int32
+
+	topicPartitions []topicPartitionAssignment
+}
+
+func (m *StickyAssignorUserDataV1) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(m.Topics)); err != nil {
+		return err
+	}
+
+	for topic, partitions := range m.Topics {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putInt32Array(partitions); err != nil {
+			return err
+		}
+	}
+
+	pe.putInt32(m.Generation)
+	return nil
+}
+
+func (m *StickyAssignorUserDataV1) decode(pd packetDecoder) (err error) {
+	var topicLen int
+	if topicLen, err = pd.getArrayLength(); err != nil {
+		return
+	}
+
+	m.Topics = make(map[string][]int32, topicLen)
+	for i := 0; i < topicLen; i++ {
+		var topic string
+		if topic, err = pd.getString(); err != nil {
+			return
+		}
+		if m.Topics[topic], err = pd.getInt32Array(); err != nil {
+			return
+		}
+	}
+
+	m.Generation, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+	m.topicPartitions = populateTopicPartitions(m.Topics)
+	return nil
+}
+
+func (m *StickyAssignorUserDataV1) partitions() []topicPartitionAssignment { return m.topicPartitions }
+func (m *StickyAssignorUserDataV1) hasGeneration() bool                    { return true }
+func (m *StickyAssignorUserDataV1) generation() int                        { return int(m.Generation) }
+
+func populateTopicPartitions(topics map[string][]int32) []topicPartitionAssignment {
+	topicPartitions := make([]topicPartitionAssignment, 0)
+	for topic, partitions := range topics {
+		for _, partition := range partitions {
+			topicPartitions = append(topicPartitions, topicPartitionAssignment{Topic: topic, Partition: partition})
+		}
+	}
+	return topicPartitions
+}

+ 51 - 0
sticky_assignor_user_data_test.go

@@ -0,0 +1,51 @@
+package sarama
+
+import (
+	"encoding/base64"
+	"testing"
+)
+
+func TestStickyAssignorUserDataV0(t *testing.T) {
+	// Single topic with deterministic ordering across encode-decode
+	req := &StickyAssignorUserDataV0{}
+	data := decodeUserDataBytes(t, "AAAAAQADdDAzAAAAAQAAAAU=")
+	testDecodable(t, "", req, data)
+	testEncodable(t, "", req, data)
+
+	// Multiple partitions
+	req = &StickyAssignorUserDataV0{}
+	data = decodeUserDataBytes(t, "AAAAAQADdDE4AAAAEgAAAAAAAAABAAAAAgAAAAMAAAAEAAAABQAAAAYAAAAHAAAACAAAAAkAAAAKAAAACwAAAAwAAAANAAAADgAAAA8AAAAQAAAAEQ==")
+	testDecodable(t, "", req, data)
+
+	// Multiple topics and partitions
+	req = &StickyAssignorUserDataV0{}
+	data = decodeUserDataBytes(t, "AAAABQADdDEyAAAAAgAAAAIAAAAKAAN0MTEAAAABAAAABAADdDE0AAAAAQAAAAgAA3QxMwAAAAEAAAANAAN0MDkAAAABAAAABQ==")
+	testDecodable(t, "", req, data)
+}
+
+func TestStickyAssignorUserDataV1(t *testing.T) {
+	// Single topic with deterministic ordering across encode-decode
+	req := &StickyAssignorUserDataV1{}
+	data := decodeUserDataBytes(t, "AAAAAQADdDA2AAAAAgAAAAAAAAAE/////w==")
+	testDecodable(t, "", req, data)
+	testEncodable(t, "", req, data)
+
+	// Multiple topics and partitions
+	req = &StickyAssignorUserDataV1{}
+	data = decodeUserDataBytes(t, "AAAABgADdDEwAAAAAgAAAAIAAAAJAAN0MTIAAAACAAAAAwAAAAsAA3QxNAAAAAEAAAAEAAN0MTMAAAABAAAACwADdDE1AAAAAQAAAAwAA3QwOQAAAAEAAAAG/////w==")
+	testDecodable(t, "", req, data)
+
+	// Generation is populated
+	req = &StickyAssignorUserDataV1{}
+	data = decodeUserDataBytes(t, "AAAAAQAHdG9waWMwMQAAAAMAAAAAAAAAAQAAAAIAAAAB")
+	testDecodable(t, "", req, data)
+}
+
+func decodeUserDataBytes(t *testing.T, base64Data string) []byte {
+	data, err := base64.StdEncoding.DecodeString(base64Data)
+	if err != nil {
+		t.Errorf("Error decoding data: %v", err)
+		t.FailNow()
+	}
+	return data
+}