123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071 |
- package sarama
- import (
- "container/heap"
- "math"
- "sort"
- "strings"
- )
- const (
-
- RangeBalanceStrategyName = "range"
-
- RoundRobinBalanceStrategyName = "roundrobin"
-
- StickyBalanceStrategyName = "sticky"
- defaultGeneration = -1
- )
- type BalanceStrategyPlan map[string]map[string][]int32
- func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
- if len(partitions) == 0 {
- return
- }
- if _, ok := p[memberID]; !ok {
- p[memberID] = make(map[string][]int32, 1)
- }
- p[memberID][topic] = append(p[memberID][topic], partitions...)
- }
- type BalanceStrategy interface {
-
- Name() string
-
-
- Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
-
-
- AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
- }
- var BalanceStrategyRange = &balanceStrategy{
- name: RangeBalanceStrategyName,
- coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
- step := float64(len(partitions)) / float64(len(memberIDs))
- for i, memberID := range memberIDs {
- pos := float64(i)
- min := int(math.Floor(pos*step + 0.5))
- max := int(math.Floor((pos+1)*step + 0.5))
- plan.Add(memberID, topic, partitions[min:max]...)
- }
- },
- }
- var BalanceStrategyRoundRobin = &balanceStrategy{
- name: RoundRobinBalanceStrategyName,
- coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
- for i, part := range partitions {
- memberID := memberIDs[i%len(memberIDs)]
- plan.Add(memberID, topic, part)
- }
- },
- }
- var BalanceStrategySticky = &stickyBalanceStrategy{}
- type balanceStrategy struct {
- name string
- coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
- }
- func (s *balanceStrategy) Name() string { return s.name }
- func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
-
- mbt := make(map[string][]string)
- for memberID, meta := range members {
- for _, topic := range meta.Topics {
- mbt[topic] = append(mbt[topic], memberID)
- }
- }
-
- for topic, memberIDs := range mbt {
- sort.Sort(&balanceStrategySortable{
- topic: topic,
- memberIDs: memberIDs,
- })
- }
-
- plan := make(BalanceStrategyPlan, len(members))
- for topic, memberIDs := range mbt {
- s.coreFn(plan, memberIDs, topic, topics[topic])
- }
- return plan, nil
- }
- func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
- return nil, nil
- }
- type balanceStrategySortable struct {
- topic string
- memberIDs []string
- }
- func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
- func (p balanceStrategySortable) Swap(i, j int) {
- p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
- }
- func (p balanceStrategySortable) Less(i, j int) bool {
- return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
- }
- func balanceStrategyHashValue(vv ...string) uint32 {
- h := uint32(2166136261)
- for _, s := range vv {
- for _, c := range s {
- h ^= uint32(c)
- h *= 16777619
- }
- }
- return h
- }
- type stickyBalanceStrategy struct {
- movements partitionMovements
- }
- func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
- func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
-
- s.movements = partitionMovements{
- Movements: make(map[topicPartitionAssignment]consumerPair),
- PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
- }
-
- currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
- if err != nil {
- return nil, err
- }
-
- isFreshAssignment := false
- if len(currentAssignment) == 0 {
- isFreshAssignment = true
- }
-
- partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
- for topic, partitions := range topics {
- for _, partition := range partitions {
- partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
- }
- }
-
-
- consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
- for memberID, meta := range members {
- consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
- for _, topicSubscription := range meta.Topics {
-
- if _, found := topics[topicSubscription]; found {
- for _, partition := range topics[topicSubscription] {
- topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
- consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
- partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
- }
- }
- }
-
- if _, exists := currentAssignment[memberID]; !exists {
- currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
- }
- }
-
- currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
- unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
- for partition := range partition2AllPotentialConsumers {
- unvisitedPartitions[partition] = true
- }
- var unassignedPartitions []topicPartitionAssignment
- for memberID, partitions := range currentAssignment {
- var keepPartitions []topicPartitionAssignment
- for _, partition := range partitions {
-
-
- if _, exists := partition2AllPotentialConsumers[partition]; !exists {
- continue
- }
- delete(unvisitedPartitions, partition)
- currentPartitionConsumers[partition] = memberID
- if !strsContains(members[memberID].Topics, partition.Topic) {
- unassignedPartitions = append(unassignedPartitions, partition)
- continue
- }
- keepPartitions = append(keepPartitions, partition)
- }
- currentAssignment[memberID] = keepPartitions
- }
- for unvisited := range unvisitedPartitions {
- unassignedPartitions = append(unassignedPartitions, unvisited)
- }
-
- sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
-
-
-
-
- sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
- s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
-
- plan := make(BalanceStrategyPlan, len(currentAssignment))
- for memberID, assignments := range currentAssignment {
- if len(assignments) == 0 {
- plan[memberID] = make(map[string][]int32, 0)
- } else {
- for _, assignment := range assignments {
- plan.Add(memberID, assignment.Topic, assignment.Partition)
- }
- }
- }
- return plan, nil
- }
- func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
- return encode(&StickyAssignorUserDataV1{
- Topics: topics,
- Generation: generationID,
- }, nil)
- }
- func strsContains(s []string, value string) bool {
- for _, entry := range s {
- if entry == value {
- return true
- }
- }
- return false
- }
- func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
- initializing := false
- if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
- initializing = true
- }
-
- for _, partition := range unassignedPartitions {
-
- if len(partition2AllPotentialConsumers[partition]) == 0 {
- continue
- }
- sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
- }
-
- for partition := range partition2AllPotentialConsumers {
- if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
- sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
- }
- }
-
- fixedAssignments := make(map[string][]topicPartitionAssignment)
- for memberID := range consumer2AllPotentialPartitions {
- if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
- fixedAssignments[memberID] = currentAssignment[memberID]
- delete(currentAssignment, memberID)
- sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
- }
- }
-
- preBalanceAssignment := deepCopyAssignment(currentAssignment)
- preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
- for k, v := range currentPartitionConsumer {
- preBalancePartitionConsumers[k] = v
- }
- reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
-
-
- if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
- currentAssignment = deepCopyAssignment(preBalanceAssignment)
- currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
- for k, v := range preBalancePartitionConsumers {
- currentPartitionConsumer[k] = v
- }
- }
-
- for consumer, assignments := range fixedAssignments {
- currentAssignment[consumer] = assignments
- }
- }
- func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
- consumer2AssignmentSize := make(map[string]int, len(assignment))
- for memberID, partitions := range assignment {
- consumer2AssignmentSize[memberID] = len(partitions)
- }
- var score float64
- for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
- delete(consumer2AssignmentSize, memberID)
- for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
- score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
- }
- }
- return int(score)
- }
- func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
- sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
- min := len(currentAssignment[sortedCurrentSubscriptions[0]])
- max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
- if min >= max-1 {
-
- return true
- }
-
- allPartitions := make(map[topicPartitionAssignment]string)
- for memberID, partitions := range currentAssignment {
- for _, partition := range partitions {
- if _, exists := allPartitions[partition]; exists {
- Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
- }
- allPartitions[partition] = memberID
- }
- }
-
-
- for _, memberID := range sortedCurrentSubscriptions {
- consumerPartitions := currentAssignment[memberID]
- consumerPartitionCount := len(consumerPartitions)
-
- if consumerPartitionCount == len(allSubscriptions[memberID]) {
- continue
- }
-
- potentialTopicPartitions := allSubscriptions[memberID]
- for _, partition := range potentialTopicPartitions {
- if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
- otherConsumer := allPartitions[partition]
- otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
- if consumerPartitionCount < otherConsumerPartitionCount {
- return false
- }
- }
- }
- }
- return true
- }
- func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
- reassignmentPerformed := false
- modified := false
-
- for {
- modified = false
-
-
- for _, partition := range reassignablePartitions {
- if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
- break
- }
-
- if len(partition2AllPotentialConsumers[partition]) <= 1 {
- Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
- }
-
- consumer := currentPartitionConsumer[partition]
- if consumer == "" {
- Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
- }
- if _, exists := prevAssignment[partition]; exists {
- if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
- sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
- reassignmentPerformed = true
- modified = true
- continue
- }
- }
-
- for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
- if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
- sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
- reassignmentPerformed = true
- modified = true
- break
- }
- }
- }
- if !modified {
- return reassignmentPerformed
- }
- }
- }
- func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
- for _, anotherConsumer := range sortedCurrentSubscriptions {
- if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
- return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
- }
- }
- return sortedCurrentSubscriptions
- }
- func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
- consumer := currentPartitionConsumer[partition]
-
- partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
- return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
- }
- func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
- oldConsumer := currentPartitionConsumer[partition]
- s.movements.movePartition(partition, oldConsumer, newConsumer)
- currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
- currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
- currentPartitionConsumer[partition] = newConsumer
- return sortMemberIDsByPartitionAssignments(currentAssignment)
- }
- func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
- currentPartitions := currentAssignment[memberID]
- currentAssignmentSize := len(currentPartitions)
- maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
- if currentAssignmentSize > maxAssignmentSize {
- Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
- }
- if currentAssignmentSize < maxAssignmentSize {
-
- return true
- }
- for _, partition := range currentPartitions {
- if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
- return true
- }
- }
- return false
- }
- func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
- return len(partition2AllPotentialConsumers[partition]) >= 2
- }
- func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
- for _, memberID := range sortedCurrentSubscriptions {
- if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
- currentAssignment[memberID] = append(currentAssignment[memberID], partition)
- currentPartitionConsumer[partition] = memberID
- break
- }
- }
- return sortMemberIDsByPartitionAssignments(currentAssignment)
- }
- func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
- userDataV1 := &StickyAssignorUserDataV1{}
- if err := decode(userDataBytes, userDataV1); err != nil {
- userDataV0 := &StickyAssignorUserDataV0{}
- if err := decode(userDataBytes, userDataV0); err != nil {
- return nil, err
- }
- return userDataV0, nil
- }
- return userDataV1, nil
- }
- func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
- assignments := deepCopyAssignment(currentAssignment)
- for memberID, partitions := range assignments {
-
- i := 0
- for _, partition := range partitions {
- if _, exists := partition2AllPotentialConsumers[partition]; exists {
- partitions[i] = partition
- i++
- }
- }
- assignments[memberID] = partitions[:i]
- }
- return assignments
- }
- func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
- for i, assignment := range assignments {
- if assignment == topic {
- return append(assignments[:i], assignments[i+1:]...)
- }
- }
- return assignments
- }
- func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
- for _, assignment := range assignments {
- if assignment == topic {
- return true
- }
- }
- return false
- }
- func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
- unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
- for partition := range partition2AllPotentialConsumers {
- unassignedPartitions[partition] = true
- }
- sortedPartitions := make([]topicPartitionAssignment, 0)
- if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
-
-
-
- assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
-
-
- pq := make(assignmentPriorityQueue, len(assignments))
- i := 0
- for consumerID, consumerAssignments := range assignments {
- pq[i] = &consumerGroupMember{
- id: consumerID,
- assignments: consumerAssignments,
- }
- i++
- }
- heap.Init(&pq)
- for {
-
- if pq.Len() == 0 {
- break
- }
- member := pq[0]
-
- var prevPartitionIndex int
- for i, partition := range member.assignments {
- if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
- prevPartitionIndex = i
- break
- }
- }
- if len(member.assignments) > 0 {
- partition := member.assignments[prevPartitionIndex]
- sortedPartitions = append(sortedPartitions, partition)
- delete(unassignedPartitions, partition)
- if prevPartitionIndex == 0 {
- member.assignments = member.assignments[1:]
- } else {
- member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
- }
- heap.Fix(&pq, 0)
- } else {
- heap.Pop(&pq)
- }
- }
- for partition := range unassignedPartitions {
- sortedPartitions = append(sortedPartitions, partition)
- }
- } else {
-
- sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
- }
- return sortedPartitions
- }
- func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
-
- sortedMemberIDs := make([]string, 0, len(assignments))
- for memberID := range assignments {
- sortedMemberIDs = append(sortedMemberIDs, memberID)
- }
- sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
- ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
- if ret == 0 {
- return sortedMemberIDs[i] < sortedMemberIDs[j]
- }
- return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
- })
- return sortedMemberIDs
- }
- func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
-
- sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
- i := 0
- for partition := range partition2AllPotentialConsumers {
- sortedPartionIDs[i] = partition
- i++
- }
- sort.Slice(sortedPartionIDs, func(i, j int) bool {
- if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
- ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
- if ret == 0 {
- return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
- }
- return ret < 0
- }
- return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
- })
- return sortedPartionIDs
- }
- func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
- dst := make([]topicPartitionAssignment, len(src))
- for i, partition := range src {
- dst[i] = partition
- }
- return dst
- }
- func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
- copy := make(map[string][]topicPartitionAssignment, len(assignment))
- for memberID, subscriptions := range assignment {
- copy[memberID] = append(subscriptions[:0:0], subscriptions...)
- }
- return copy
- }
- func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
- curMembers := make(map[string]int)
- for _, cur := range partition2AllPotentialConsumers {
- if len(curMembers) == 0 {
- for _, curMembersElem := range cur {
- curMembers[curMembersElem]++
- }
- continue
- }
- if len(curMembers) != len(cur) {
- return false
- }
- yMap := make(map[string]int)
- for _, yElem := range cur {
- yMap[yElem]++
- }
- for curMembersMapKey, curMembersMapVal := range curMembers {
- if yMap[curMembersMapKey] != curMembersMapVal {
- return false
- }
- }
- }
- curPartitions := make(map[topicPartitionAssignment]int)
- for _, cur := range consumer2AllPotentialPartitions {
- if len(curPartitions) == 0 {
- for _, curPartitionElem := range cur {
- curPartitions[curPartitionElem]++
- }
- continue
- }
- if len(curPartitions) != len(cur) {
- return false
- }
- yMap := make(map[topicPartitionAssignment]int)
- for _, yElem := range cur {
- yMap[yElem]++
- }
- for curMembersMapKey, curMembersMapVal := range curPartitions {
- if yMap[curMembersMapKey] != curMembersMapVal {
- return false
- }
- }
- }
- return true
- }
- func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
- currentAssignment := make(map[string][]topicPartitionAssignment)
- prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
-
- sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
- for memberID, meta := range members {
- consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
- if err != nil {
- return nil, nil, err
- }
- for _, partition := range consumerUserData.partitions() {
- if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
- if consumerUserData.hasGeneration() {
- if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
-
-
- Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
- continue
- } else {
- consumers[consumerUserData.generation()] = memberID
- }
- } else {
- consumers[defaultGeneration] = memberID
- }
- } else {
- generation := defaultGeneration
- if consumerUserData.hasGeneration() {
- generation = consumerUserData.generation()
- }
- sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
- }
- }
- }
-
-
- for partition, consumers := range sortedPartitionConsumersByGeneration {
-
- var generations []int
- for generation := range consumers {
- generations = append(generations, generation)
- }
- sort.Sort(sort.Reverse(sort.IntSlice(generations)))
- consumer := consumers[generations[0]]
- if _, exists := currentAssignment[consumer]; !exists {
- currentAssignment[consumer] = []topicPartitionAssignment{partition}
- } else {
- currentAssignment[consumer] = append(currentAssignment[consumer], partition)
- }
-
- if len(generations) > 1 {
- prevAssignment[partition] = consumerGenerationPair{
- MemberID: consumers[generations[1]],
- Generation: generations[1],
- }
- }
- }
- return currentAssignment, prevAssignment, nil
- }
- type consumerGenerationPair struct {
- MemberID string
- Generation int
- }
- type consumerPair struct {
- SrcMemberID string
- DstMemberID string
- }
- type partitionMovements struct {
- PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
- Movements map[topicPartitionAssignment]consumerPair
- }
- func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
- pair := p.Movements[partition]
- delete(p.Movements, partition)
- partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
- delete(partitionMovementsForThisTopic[pair], partition)
- if len(partitionMovementsForThisTopic[pair]) == 0 {
- delete(partitionMovementsForThisTopic, pair)
- }
- if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
- delete(p.PartitionMovementsByTopic, partition.Topic)
- }
- return pair
- }
- func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
- p.Movements[partition] = pair
- if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
- p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
- }
- partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
- if _, exists := partitionMovementsForThisTopic[pair]; !exists {
- partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
- }
- partitionMovementsForThisTopic[pair][partition] = true
- }
- func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
- pair := consumerPair{
- SrcMemberID: oldConsumer,
- DstMemberID: newConsumer,
- }
- if _, exists := p.Movements[partition]; exists {
-
- existingPair := p.removeMovementRecordOfPartition(partition)
- if existingPair.DstMemberID != oldConsumer {
- Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
- }
- if existingPair.SrcMemberID != newConsumer {
-
- p.addPartitionMovementRecord(partition, consumerPair{
- SrcMemberID: existingPair.SrcMemberID,
- DstMemberID: newConsumer,
- })
- }
- } else {
- p.addPartitionMovementRecord(partition, pair)
- }
- }
- func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
- if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
- return partition
- }
- if _, exists := p.Movements[partition]; exists {
-
- if oldConsumer != p.Movements[partition].DstMemberID {
- Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
- }
- oldConsumer = p.Movements[partition].SrcMemberID
- }
- partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
- reversePair := consumerPair{
- SrcMemberID: newConsumer,
- DstMemberID: oldConsumer,
- }
- if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
- return partition
- }
- var reversePairPartition topicPartitionAssignment
- for otherPartition := range partitionMovementsForThisTopic[reversePair] {
- reversePairPartition = otherPartition
- }
- return reversePairPartition
- }
- func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
- if src == dst {
- return currentPath, false
- }
- if len(pairs) == 0 {
- return currentPath, false
- }
- for _, pair := range pairs {
- if src == pair.SrcMemberID && dst == pair.DstMemberID {
- currentPath = append(currentPath, src, dst)
- return currentPath, true
- }
- }
- for _, pair := range pairs {
- if pair.SrcMemberID == src {
-
- reducedSet := make([]consumerPair, len(pairs)-1)
- i := 0
- for _, p := range pairs {
- if p != pair {
- reducedSet[i] = pair
- i++
- }
- }
- currentPath = append(currentPath, pair.SrcMemberID)
- return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
- }
- }
- return currentPath, false
- }
- func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
- superCycle := make([]string, len(cycle)-1)
- for i := 0; i < len(cycle)-1; i++ {
- superCycle[i] = cycle[i]
- }
- for _, c := range cycle {
- superCycle = append(superCycle, c)
- }
- for _, foundCycle := range cycles {
- if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
- return true
- }
- }
- return false
- }
- func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
- cycles := make([][]string, 0)
- for _, pair := range pairs {
-
- reducedPairs := make([]consumerPair, len(pairs)-1)
- i := 0
- for _, p := range pairs {
- if p != pair {
- reducedPairs[i] = pair
- i++
- }
- }
- if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
- if !p.in(path, cycles) {
- cycles = append(cycles, path)
- Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
- }
- }
- }
-
-
-
- for _, cycle := range cycles {
- if len(cycle) == 3 {
- return true
- }
- }
- return false
- }
- func (p *partitionMovements) isSticky() bool {
- for topic, movements := range p.PartitionMovementsByTopic {
- movementPairs := make([]consumerPair, len(movements))
- i := 0
- for pair := range movements {
- movementPairs[i] = pair
- i++
- }
- if p.hasCycles(movementPairs) {
- Logger.Printf("Stickiness is violated for topic %s", topic)
- Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
- return false
- }
- }
- return true
- }
- func indexOfSubList(source []string, target []string) int {
- targetSize := len(target)
- maxCandidate := len(source) - targetSize
- nextCand:
- for candidate := 0; candidate <= maxCandidate; candidate++ {
- j := candidate
- for i := 0; i < targetSize; i++ {
- if target[i] != source[j] {
-
- continue nextCand
- }
- j++
- }
-
- return candidate
- }
- return -1
- }
- type consumerGroupMember struct {
- id string
- assignments []topicPartitionAssignment
- }
- type assignmentPriorityQueue []*consumerGroupMember
- func (pq assignmentPriorityQueue) Len() int { return len(pq) }
- func (pq assignmentPriorityQueue) Less(i, j int) bool {
-
- if len(pq[i].assignments) == len(pq[j].assignments) {
- return strings.Compare(pq[i].id, pq[j].id) > 0
- }
- return len(pq[i].assignments) > len(pq[j].assignments)
- }
- func (pq assignmentPriorityQueue) Swap(i, j int) {
- pq[i], pq[j] = pq[j], pq[i]
- }
- func (pq *assignmentPriorityQueue) Push(x interface{}) {
- member := x.(*consumerGroupMember)
- *pq = append(*pq, member)
- }
- func (pq *assignmentPriorityQueue) Pop() interface{} {
- old := *pq
- n := len(old)
- member := old[n-1]
- *pq = old[0 : n-1]
- return member
- }
|