balance_strategy.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069
  1. package sarama
  2. import (
  3. "container/heap"
  4. "math"
  5. "sort"
  6. "strings"
  7. )
  8. const (
  9. // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
  10. RangeBalanceStrategyName = "range"
  11. // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
  12. RoundRobinBalanceStrategyName = "roundrobin"
  13. // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
  14. StickyBalanceStrategyName = "sticky"
  15. defaultGeneration = -1
  16. )
  17. // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
  18. // It contains an allocation of topic/partitions by memberID in the form of
  19. // a `memberID -> topic -> partitions` map.
  20. type BalanceStrategyPlan map[string]map[string][]int32
  21. // Add assigns a topic with a number partitions to a member.
  22. func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
  23. if len(partitions) == 0 {
  24. return
  25. }
  26. if _, ok := p[memberID]; !ok {
  27. p[memberID] = make(map[string][]int32, 1)
  28. }
  29. p[memberID][topic] = append(p[memberID][topic], partitions...)
  30. }
  31. // --------------------------------------------------------------------
  32. // BalanceStrategy is used to balance topics and partitions
  33. // across members of a consumer group
  34. type BalanceStrategy interface {
  35. // Name uniquely identifies the strategy.
  36. Name() string
  37. // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
  38. // and returns a distribution plan.
  39. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
  40. // AssignmentData returns the serialized assignment data for the specified
  41. // memberID
  42. AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
  43. }
  44. // --------------------------------------------------------------------
  45. // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
  46. // Example with one topic T with six partitions (0..5) and two members (M1, M2):
  47. // M1: {T: [0, 1, 2]}
  48. // M2: {T: [3, 4, 5]}
  49. var BalanceStrategyRange = &balanceStrategy{
  50. name: RangeBalanceStrategyName,
  51. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  52. step := float64(len(partitions)) / float64(len(memberIDs))
  53. for i, memberID := range memberIDs {
  54. pos := float64(i)
  55. min := int(math.Floor(pos*step + 0.5))
  56. max := int(math.Floor((pos+1)*step + 0.5))
  57. plan.Add(memberID, topic, partitions[min:max]...)
  58. }
  59. },
  60. }
  61. // BalanceStrategyRoundRobin assigns partitions to members in alternating order.
  62. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  63. // M1: {T: [0, 2, 4]}
  64. // M2: {T: [1, 3, 5]}
  65. var BalanceStrategyRoundRobin = &balanceStrategy{
  66. name: RoundRobinBalanceStrategyName,
  67. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  68. for i, part := range partitions {
  69. memberID := memberIDs[i%len(memberIDs)]
  70. plan.Add(memberID, topic, part)
  71. }
  72. },
  73. }
  74. // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
  75. // while maintain a balanced partition distribution.
  76. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  77. // M1: {T: [0, 2, 4]}
  78. // M2: {T: [1, 3, 5]}
  79. //
  80. // On reassignment with an additional consumer, you might get an assignment plan like:
  81. // M1: {T: [0, 2]}
  82. // M2: {T: [1, 3]}
  83. // M3: {T: [4, 5]}
  84. //
  85. var BalanceStrategySticky = &stickyBalanceStrategy{}
  86. // --------------------------------------------------------------------
  87. type balanceStrategy struct {
  88. name string
  89. coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
  90. }
  91. // Name implements BalanceStrategy.
  92. func (s *balanceStrategy) Name() string { return s.name }
  93. // Plan implements BalanceStrategy.
  94. func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  95. // Build members by topic map
  96. mbt := make(map[string][]string)
  97. for memberID, meta := range members {
  98. for _, topic := range meta.Topics {
  99. mbt[topic] = append(mbt[topic], memberID)
  100. }
  101. }
  102. // Sort members for each topic
  103. for topic, memberIDs := range mbt {
  104. sort.Sort(&balanceStrategySortable{
  105. topic: topic,
  106. memberIDs: memberIDs,
  107. })
  108. }
  109. // Assemble plan
  110. plan := make(BalanceStrategyPlan, len(members))
  111. for topic, memberIDs := range mbt {
  112. s.coreFn(plan, memberIDs, topic, topics[topic])
  113. }
  114. return plan, nil
  115. }
  116. // AssignmentData simple strategies do not require any shared assignment data
  117. func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  118. return nil, nil
  119. }
  120. type balanceStrategySortable struct {
  121. topic string
  122. memberIDs []string
  123. }
  124. func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
  125. func (p balanceStrategySortable) Swap(i, j int) {
  126. p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
  127. }
  128. func (p balanceStrategySortable) Less(i, j int) bool {
  129. return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
  130. }
  131. func balanceStrategyHashValue(vv ...string) uint32 {
  132. h := uint32(2166136261)
  133. for _, s := range vv {
  134. for _, c := range s {
  135. h ^= uint32(c)
  136. h *= 16777619
  137. }
  138. }
  139. return h
  140. }
  141. type stickyBalanceStrategy struct {
  142. movements partitionMovements
  143. }
  144. // Name implements BalanceStrategy.
  145. func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
  146. // Plan implements BalanceStrategy.
  147. func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  148. // track partition movements during generation of the partition assignment plan
  149. s.movements = partitionMovements{
  150. Movements: make(map[topicPartitionAssignment]consumerPair),
  151. PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
  152. }
  153. // prepopulate the current assignment state from userdata on the consumer group members
  154. currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
  155. if err != nil {
  156. return nil, err
  157. }
  158. // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
  159. isFreshAssignment := false
  160. if len(currentAssignment) == 0 {
  161. isFreshAssignment = true
  162. }
  163. // create a mapping of all current topic partitions and the consumers that can be assigned to them
  164. partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
  165. for topic, partitions := range topics {
  166. for _, partition := range partitions {
  167. partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
  168. }
  169. }
  170. // create a mapping of all consumers to all potential topic partitions that can be assigned to them
  171. // also, populate the mapping of partitions to potential consumers
  172. consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
  173. for memberID, meta := range members {
  174. consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
  175. for _, topicSubscription := range meta.Topics {
  176. // only evaluate topic subscriptions that are present in the supplied topics map
  177. if _, found := topics[topicSubscription]; found {
  178. for _, partition := range topics[topicSubscription] {
  179. topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
  180. consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
  181. partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
  182. }
  183. }
  184. }
  185. // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
  186. if _, exists := currentAssignment[memberID]; !exists {
  187. currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
  188. }
  189. }
  190. // create a mapping of each partition to its current consumer, where possible
  191. currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
  192. unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  193. for partition := range partition2AllPotentialConsumers {
  194. unvisitedPartitions[partition] = true
  195. }
  196. var unassignedPartitions []topicPartitionAssignment
  197. for memberID, partitions := range currentAssignment {
  198. var keepPartitions []topicPartitionAssignment
  199. for _, partition := range partitions {
  200. // If this partition no longer exists at all, likely due to the
  201. // topic being deleted, we remove the partition from the member.
  202. if _, exists := partition2AllPotentialConsumers[partition]; !exists {
  203. continue
  204. }
  205. delete(unvisitedPartitions, partition)
  206. currentPartitionConsumers[partition] = memberID
  207. if !strsContains(members[memberID].Topics, partition.Topic) {
  208. unassignedPartitions = append(unassignedPartitions, partition)
  209. continue
  210. }
  211. keepPartitions = append(keepPartitions, partition)
  212. }
  213. currentAssignment[memberID] = keepPartitions
  214. }
  215. for unvisited := range unvisitedPartitions {
  216. unassignedPartitions = append(unassignedPartitions, unvisited)
  217. }
  218. // sort the topic partitions in order of priority for reassignment
  219. sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
  220. // at this point we have preserved all valid topic partition to consumer assignments and removed
  221. // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
  222. // to consumers so that the topic partition assignments are as balanced as possible.
  223. // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
  224. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  225. s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
  226. // Assemble plan
  227. plan := make(BalanceStrategyPlan, len(currentAssignment))
  228. for memberID, assignments := range currentAssignment {
  229. if len(assignments) == 0 {
  230. plan[memberID] = make(map[string][]int32)
  231. } else {
  232. for _, assignment := range assignments {
  233. plan.Add(memberID, assignment.Topic, assignment.Partition)
  234. }
  235. }
  236. }
  237. return plan, nil
  238. }
  239. // AssignmentData serializes the set of topics currently assigned to the
  240. // specified member as part of the supplied balance plan
  241. func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  242. return encode(&StickyAssignorUserDataV1{
  243. Topics: topics,
  244. Generation: generationID,
  245. }, nil)
  246. }
  247. func strsContains(s []string, value string) bool {
  248. for _, entry := range s {
  249. if entry == value {
  250. return true
  251. }
  252. }
  253. return false
  254. }
  255. // Balance assignments across consumers for maximum fairness and stickiness.
  256. func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
  257. initializing := false
  258. if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
  259. initializing = true
  260. }
  261. // assign all unassigned partitions
  262. for _, partition := range unassignedPartitions {
  263. // skip if there is no potential consumer for the partition
  264. if len(partition2AllPotentialConsumers[partition]) == 0 {
  265. continue
  266. }
  267. sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
  268. }
  269. // narrow down the reassignment scope to only those partitions that can actually be reassigned
  270. for partition := range partition2AllPotentialConsumers {
  271. if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  272. sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
  273. }
  274. }
  275. // narrow down the reassignment scope to only those consumers that are subject to reassignment
  276. fixedAssignments := make(map[string][]topicPartitionAssignment)
  277. for memberID := range consumer2AllPotentialPartitions {
  278. if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
  279. fixedAssignments[memberID] = currentAssignment[memberID]
  280. delete(currentAssignment, memberID)
  281. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  282. }
  283. }
  284. // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
  285. preBalanceAssignment := deepCopyAssignment(currentAssignment)
  286. preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
  287. for k, v := range currentPartitionConsumer {
  288. preBalancePartitionConsumers[k] = v
  289. }
  290. reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
  291. // if we are not preserving existing assignments and we have made changes to the current assignment
  292. // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
  293. if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
  294. currentAssignment = deepCopyAssignment(preBalanceAssignment)
  295. currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
  296. for k, v := range preBalancePartitionConsumers {
  297. currentPartitionConsumer[k] = v
  298. }
  299. }
  300. // add the fixed assignments (those that could not change) back
  301. for consumer, assignments := range fixedAssignments {
  302. currentAssignment[consumer] = assignments
  303. }
  304. }
  305. // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
  306. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
  307. // Lower balance score indicates a more balanced assignment.
  308. func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
  309. consumer2AssignmentSize := make(map[string]int, len(assignment))
  310. for memberID, partitions := range assignment {
  311. consumer2AssignmentSize[memberID] = len(partitions)
  312. }
  313. var score float64
  314. for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
  315. delete(consumer2AssignmentSize, memberID)
  316. for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
  317. score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
  318. }
  319. }
  320. return int(score)
  321. }
  322. // Determine whether the current assignment plan is balanced.
  323. func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
  324. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  325. min := len(currentAssignment[sortedCurrentSubscriptions[0]])
  326. max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
  327. if min >= max-1 {
  328. // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
  329. return true
  330. }
  331. // create a mapping from partitions to the consumer assigned to them
  332. allPartitions := make(map[topicPartitionAssignment]string)
  333. for memberID, partitions := range currentAssignment {
  334. for _, partition := range partitions {
  335. if _, exists := allPartitions[partition]; exists {
  336. Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
  337. }
  338. allPartitions[partition] = memberID
  339. }
  340. }
  341. // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
  342. // could but did not get cannot be moved to it (because that would break the balance)
  343. for _, memberID := range sortedCurrentSubscriptions {
  344. consumerPartitions := currentAssignment[memberID]
  345. consumerPartitionCount := len(consumerPartitions)
  346. // skip if this consumer already has all the topic partitions it can get
  347. if consumerPartitionCount == len(allSubscriptions[memberID]) {
  348. continue
  349. }
  350. // otherwise make sure it cannot get any more
  351. potentialTopicPartitions := allSubscriptions[memberID]
  352. for _, partition := range potentialTopicPartitions {
  353. if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
  354. otherConsumer := allPartitions[partition]
  355. otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
  356. if consumerPartitionCount < otherConsumerPartitionCount {
  357. return false
  358. }
  359. }
  360. }
  361. }
  362. return true
  363. }
  364. // Reassign all topic partitions that need reassignment until balanced.
  365. func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
  366. reassignmentPerformed := false
  367. modified := false
  368. // repeat reassignment until no partition can be moved to improve the balance
  369. for {
  370. modified = false
  371. // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
  372. // until the full list is processed or a balance is achieved
  373. for _, partition := range reassignablePartitions {
  374. if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
  375. break
  376. }
  377. // the partition must have at least two consumers
  378. if len(partition2AllPotentialConsumers[partition]) <= 1 {
  379. Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
  380. }
  381. // the partition must have a consumer
  382. consumer := currentPartitionConsumer[partition]
  383. if consumer == "" {
  384. Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
  385. }
  386. if _, exists := prevAssignment[partition]; exists {
  387. if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
  388. sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
  389. reassignmentPerformed = true
  390. modified = true
  391. continue
  392. }
  393. }
  394. // check if a better-suited consumer exists for the partition; if so, reassign it
  395. for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
  396. if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
  397. sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
  398. reassignmentPerformed = true
  399. modified = true
  400. break
  401. }
  402. }
  403. }
  404. if !modified {
  405. return reassignmentPerformed
  406. }
  407. }
  408. }
  409. // Identify a new consumer for a topic partition and reassign it.
  410. func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
  411. for _, anotherConsumer := range sortedCurrentSubscriptions {
  412. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
  413. return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
  414. }
  415. }
  416. return sortedCurrentSubscriptions
  417. }
  418. // Reassign a specific partition to a new consumer
  419. func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
  420. consumer := currentPartitionConsumer[partition]
  421. // find the correct partition movement considering the stickiness requirement
  422. partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
  423. return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
  424. }
  425. // Track the movement of a topic partition after assignment
  426. func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  427. oldConsumer := currentPartitionConsumer[partition]
  428. s.movements.movePartition(partition, oldConsumer, newConsumer)
  429. currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
  430. currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
  431. currentPartitionConsumer[partition] = newConsumer
  432. return sortMemberIDsByPartitionAssignments(currentAssignment)
  433. }
  434. // Determine whether a specific consumer should be considered for topic partition assignment.
  435. func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  436. currentPartitions := currentAssignment[memberID]
  437. currentAssignmentSize := len(currentPartitions)
  438. maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
  439. if currentAssignmentSize > maxAssignmentSize {
  440. Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
  441. }
  442. if currentAssignmentSize < maxAssignmentSize {
  443. // if a consumer is not assigned all its potential partitions it is subject to reassignment
  444. return true
  445. }
  446. for _, partition := range currentPartitions {
  447. if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  448. return true
  449. }
  450. }
  451. return false
  452. }
  453. // Only consider reassigning those topic partitions that have two or more potential consumers.
  454. func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  455. return len(partition2AllPotentialConsumers[partition]) >= 2
  456. }
  457. // The assignment should improve the overall balance of the partition assignments to consumers.
  458. func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  459. for _, memberID := range sortedCurrentSubscriptions {
  460. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
  461. currentAssignment[memberID] = append(currentAssignment[memberID], partition)
  462. currentPartitionConsumer[partition] = memberID
  463. break
  464. }
  465. }
  466. return sortMemberIDsByPartitionAssignments(currentAssignment)
  467. }
  468. // Deserialize topic partition assignment data to aid with creation of a sticky assignment.
  469. func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
  470. userDataV1 := &StickyAssignorUserDataV1{}
  471. if err := decode(userDataBytes, userDataV1); err != nil {
  472. userDataV0 := &StickyAssignorUserDataV0{}
  473. if err := decode(userDataBytes, userDataV0); err != nil {
  474. return nil, err
  475. }
  476. return userDataV0, nil
  477. }
  478. return userDataV1, nil
  479. }
  480. // filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
  481. // to those topic partitions currently reported by the Kafka cluster.
  482. func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
  483. assignments := deepCopyAssignment(currentAssignment)
  484. for memberID, partitions := range assignments {
  485. // perform in-place filtering
  486. i := 0
  487. for _, partition := range partitions {
  488. if _, exists := partition2AllPotentialConsumers[partition]; exists {
  489. partitions[i] = partition
  490. i++
  491. }
  492. }
  493. assignments[memberID] = partitions[:i]
  494. }
  495. return assignments
  496. }
  497. func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
  498. for i, assignment := range assignments {
  499. if assignment == topic {
  500. return append(assignments[:i], assignments[i+1:]...)
  501. }
  502. }
  503. return assignments
  504. }
  505. func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
  506. for _, assignment := range assignments {
  507. if assignment == topic {
  508. return true
  509. }
  510. }
  511. return false
  512. }
  513. func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
  514. unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  515. for partition := range partition2AllPotentialConsumers {
  516. unassignedPartitions[partition] = true
  517. }
  518. sortedPartitions := make([]topicPartitionAssignment, 0)
  519. if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
  520. // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
  521. // then we just need to simply list partitions in a round robin fashion (from consumers with
  522. // most assigned partitions to those with least)
  523. assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
  524. // use priority-queue to evaluate consumer group members in descending-order based on
  525. // the number of topic partition assignments (i.e. consumers with most assignments first)
  526. pq := make(assignmentPriorityQueue, len(assignments))
  527. i := 0
  528. for consumerID, consumerAssignments := range assignments {
  529. pq[i] = &consumerGroupMember{
  530. id: consumerID,
  531. assignments: consumerAssignments,
  532. }
  533. i++
  534. }
  535. heap.Init(&pq)
  536. for {
  537. // loop until no consumer-group members remain
  538. if pq.Len() == 0 {
  539. break
  540. }
  541. member := pq[0]
  542. // partitions that were assigned to a different consumer last time
  543. var prevPartitionIndex int
  544. for i, partition := range member.assignments {
  545. if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
  546. prevPartitionIndex = i
  547. break
  548. }
  549. }
  550. if len(member.assignments) > 0 {
  551. partition := member.assignments[prevPartitionIndex]
  552. sortedPartitions = append(sortedPartitions, partition)
  553. delete(unassignedPartitions, partition)
  554. if prevPartitionIndex == 0 {
  555. member.assignments = member.assignments[1:]
  556. } else {
  557. member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
  558. }
  559. heap.Fix(&pq, 0)
  560. } else {
  561. heap.Pop(&pq)
  562. }
  563. }
  564. for partition := range unassignedPartitions {
  565. sortedPartitions = append(sortedPartitions, partition)
  566. }
  567. } else {
  568. // an ascending sorted set of topic partitions based on how many consumers can potentially use them
  569. sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
  570. }
  571. return sortedPartitions
  572. }
  573. func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
  574. // sort the members by the number of partition assignments in ascending order
  575. sortedMemberIDs := make([]string, 0, len(assignments))
  576. for memberID := range assignments {
  577. sortedMemberIDs = append(sortedMemberIDs, memberID)
  578. }
  579. sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
  580. ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
  581. if ret == 0 {
  582. return sortedMemberIDs[i] < sortedMemberIDs[j]
  583. }
  584. return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
  585. })
  586. return sortedMemberIDs
  587. }
  588. func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
  589. // sort the members by the number of partition assignments in descending order
  590. sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
  591. i := 0
  592. for partition := range partition2AllPotentialConsumers {
  593. sortedPartionIDs[i] = partition
  594. i++
  595. }
  596. sort.Slice(sortedPartionIDs, func(i, j int) bool {
  597. if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
  598. ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
  599. if ret == 0 {
  600. return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
  601. }
  602. return ret < 0
  603. }
  604. return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
  605. })
  606. return sortedPartionIDs
  607. }
  608. func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
  609. dst := make([]topicPartitionAssignment, len(src))
  610. for i, partition := range src {
  611. dst[i] = partition
  612. }
  613. return dst
  614. }
  615. func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
  616. copy := make(map[string][]topicPartitionAssignment, len(assignment))
  617. for memberID, subscriptions := range assignment {
  618. copy[memberID] = append(subscriptions[:0:0], subscriptions...)
  619. }
  620. return copy
  621. }
  622. func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
  623. curMembers := make(map[string]int)
  624. for _, cur := range partition2AllPotentialConsumers {
  625. if len(curMembers) == 0 {
  626. for _, curMembersElem := range cur {
  627. curMembers[curMembersElem]++
  628. }
  629. continue
  630. }
  631. if len(curMembers) != len(cur) {
  632. return false
  633. }
  634. yMap := make(map[string]int)
  635. for _, yElem := range cur {
  636. yMap[yElem]++
  637. }
  638. for curMembersMapKey, curMembersMapVal := range curMembers {
  639. if yMap[curMembersMapKey] != curMembersMapVal {
  640. return false
  641. }
  642. }
  643. }
  644. curPartitions := make(map[topicPartitionAssignment]int)
  645. for _, cur := range consumer2AllPotentialPartitions {
  646. if len(curPartitions) == 0 {
  647. for _, curPartitionElem := range cur {
  648. curPartitions[curPartitionElem]++
  649. }
  650. continue
  651. }
  652. if len(curPartitions) != len(cur) {
  653. return false
  654. }
  655. yMap := make(map[topicPartitionAssignment]int)
  656. for _, yElem := range cur {
  657. yMap[yElem]++
  658. }
  659. for curMembersMapKey, curMembersMapVal := range curPartitions {
  660. if yMap[curMembersMapKey] != curMembersMapVal {
  661. return false
  662. }
  663. }
  664. }
  665. return true
  666. }
  667. // We need to process subscriptions' user data with each consumer's reported generation in mind
  668. // higher generations overwrite lower generations in case of a conflict
  669. // note that a conflict could exist only if user data is for different generations
  670. func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
  671. currentAssignment := make(map[string][]topicPartitionAssignment)
  672. prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
  673. // for each partition we create a sorted map of its consumers by generation
  674. sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
  675. for memberID, meta := range members {
  676. consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
  677. if err != nil {
  678. return nil, nil, err
  679. }
  680. for _, partition := range consumerUserData.partitions() {
  681. if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
  682. if consumerUserData.hasGeneration() {
  683. if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
  684. // same partition is assigned to two consumers during the same rebalance.
  685. // log a warning and skip this record
  686. Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
  687. continue
  688. } else {
  689. consumers[consumerUserData.generation()] = memberID
  690. }
  691. } else {
  692. consumers[defaultGeneration] = memberID
  693. }
  694. } else {
  695. generation := defaultGeneration
  696. if consumerUserData.hasGeneration() {
  697. generation = consumerUserData.generation()
  698. }
  699. sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
  700. }
  701. }
  702. }
  703. // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
  704. // current and previous consumers are the last two consumers of each partition in the above sorted map
  705. for partition, consumers := range sortedPartitionConsumersByGeneration {
  706. // sort consumers by generation in decreasing order
  707. var generations []int
  708. for generation := range consumers {
  709. generations = append(generations, generation)
  710. }
  711. sort.Sort(sort.Reverse(sort.IntSlice(generations)))
  712. consumer := consumers[generations[0]]
  713. if _, exists := currentAssignment[consumer]; !exists {
  714. currentAssignment[consumer] = []topicPartitionAssignment{partition}
  715. } else {
  716. currentAssignment[consumer] = append(currentAssignment[consumer], partition)
  717. }
  718. // check for previous assignment, if any
  719. if len(generations) > 1 {
  720. prevAssignment[partition] = consumerGenerationPair{
  721. MemberID: consumers[generations[1]],
  722. Generation: generations[1],
  723. }
  724. }
  725. }
  726. return currentAssignment, prevAssignment, nil
  727. }
  728. type consumerGenerationPair struct {
  729. MemberID string
  730. Generation int
  731. }
  732. // consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
  733. type consumerPair struct {
  734. SrcMemberID string
  735. DstMemberID string
  736. }
  737. // partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
  738. type partitionMovements struct {
  739. PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
  740. Movements map[topicPartitionAssignment]consumerPair
  741. }
  742. func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
  743. pair := p.Movements[partition]
  744. delete(p.Movements, partition)
  745. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  746. delete(partitionMovementsForThisTopic[pair], partition)
  747. if len(partitionMovementsForThisTopic[pair]) == 0 {
  748. delete(partitionMovementsForThisTopic, pair)
  749. }
  750. if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
  751. delete(p.PartitionMovementsByTopic, partition.Topic)
  752. }
  753. return pair
  754. }
  755. func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
  756. p.Movements[partition] = pair
  757. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  758. p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
  759. }
  760. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  761. if _, exists := partitionMovementsForThisTopic[pair]; !exists {
  762. partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
  763. }
  764. partitionMovementsForThisTopic[pair][partition] = true
  765. }
  766. func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
  767. pair := consumerPair{
  768. SrcMemberID: oldConsumer,
  769. DstMemberID: newConsumer,
  770. }
  771. if _, exists := p.Movements[partition]; exists {
  772. // this partition has previously moved
  773. existingPair := p.removeMovementRecordOfPartition(partition)
  774. if existingPair.DstMemberID != oldConsumer {
  775. Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
  776. }
  777. if existingPair.SrcMemberID != newConsumer {
  778. // the partition is not moving back to its previous consumer
  779. p.addPartitionMovementRecord(partition, consumerPair{
  780. SrcMemberID: existingPair.SrcMemberID,
  781. DstMemberID: newConsumer,
  782. })
  783. }
  784. } else {
  785. p.addPartitionMovementRecord(partition, pair)
  786. }
  787. }
  788. func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
  789. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  790. return partition
  791. }
  792. if _, exists := p.Movements[partition]; exists {
  793. // this partition has previously moved
  794. if oldConsumer != p.Movements[partition].DstMemberID {
  795. Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
  796. }
  797. oldConsumer = p.Movements[partition].SrcMemberID
  798. }
  799. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  800. reversePair := consumerPair{
  801. SrcMemberID: newConsumer,
  802. DstMemberID: oldConsumer,
  803. }
  804. if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
  805. return partition
  806. }
  807. var reversePairPartition topicPartitionAssignment
  808. for otherPartition := range partitionMovementsForThisTopic[reversePair] {
  809. reversePairPartition = otherPartition
  810. }
  811. return reversePairPartition
  812. }
  813. func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
  814. if src == dst {
  815. return currentPath, false
  816. }
  817. if len(pairs) == 0 {
  818. return currentPath, false
  819. }
  820. for _, pair := range pairs {
  821. if src == pair.SrcMemberID && dst == pair.DstMemberID {
  822. currentPath = append(currentPath, src, dst)
  823. return currentPath, true
  824. }
  825. }
  826. for _, pair := range pairs {
  827. if pair.SrcMemberID == src {
  828. // create a deep copy of the pairs, excluding the current pair
  829. reducedSet := make([]consumerPair, len(pairs)-1)
  830. i := 0
  831. for _, p := range pairs {
  832. if p != pair {
  833. reducedSet[i] = pair
  834. i++
  835. }
  836. }
  837. currentPath = append(currentPath, pair.SrcMemberID)
  838. return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
  839. }
  840. }
  841. return currentPath, false
  842. }
  843. func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
  844. superCycle := make([]string, len(cycle)-1)
  845. for i := 0; i < len(cycle)-1; i++ {
  846. superCycle[i] = cycle[i]
  847. }
  848. superCycle = append(superCycle, cycle...)
  849. for _, foundCycle := range cycles {
  850. if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
  851. return true
  852. }
  853. }
  854. return false
  855. }
  856. func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
  857. cycles := make([][]string, 0)
  858. for _, pair := range pairs {
  859. // create a deep copy of the pairs, excluding the current pair
  860. reducedPairs := make([]consumerPair, len(pairs)-1)
  861. i := 0
  862. for _, p := range pairs {
  863. if p != pair {
  864. reducedPairs[i] = pair
  865. i++
  866. }
  867. }
  868. if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
  869. if !p.in(path, cycles) {
  870. cycles = append(cycles, path)
  871. Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
  872. }
  873. }
  874. }
  875. // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
  876. // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
  877. // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
  878. for _, cycle := range cycles {
  879. if len(cycle) == 3 {
  880. return true
  881. }
  882. }
  883. return false
  884. }
  885. func (p *partitionMovements) isSticky() bool {
  886. for topic, movements := range p.PartitionMovementsByTopic {
  887. movementPairs := make([]consumerPair, len(movements))
  888. i := 0
  889. for pair := range movements {
  890. movementPairs[i] = pair
  891. i++
  892. }
  893. if p.hasCycles(movementPairs) {
  894. Logger.Printf("Stickiness is violated for topic %s", topic)
  895. Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
  896. return false
  897. }
  898. }
  899. return true
  900. }
  901. func indexOfSubList(source []string, target []string) int {
  902. targetSize := len(target)
  903. maxCandidate := len(source) - targetSize
  904. nextCand:
  905. for candidate := 0; candidate <= maxCandidate; candidate++ {
  906. j := candidate
  907. for i := 0; i < targetSize; i++ {
  908. if target[i] != source[j] {
  909. // Element mismatch, try next cand
  910. continue nextCand
  911. }
  912. j++
  913. }
  914. // All elements of candidate matched target
  915. return candidate
  916. }
  917. return -1
  918. }
  919. type consumerGroupMember struct {
  920. id string
  921. assignments []topicPartitionAssignment
  922. }
  923. // assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
  924. // in descending order (most assignments to least assignments).
  925. type assignmentPriorityQueue []*consumerGroupMember
  926. func (pq assignmentPriorityQueue) Len() int { return len(pq) }
  927. func (pq assignmentPriorityQueue) Less(i, j int) bool {
  928. // order asssignment priority queue in descending order using assignment-count/member-id
  929. if len(pq[i].assignments) == len(pq[j].assignments) {
  930. return strings.Compare(pq[i].id, pq[j].id) > 0
  931. }
  932. return len(pq[i].assignments) > len(pq[j].assignments)
  933. }
  934. func (pq assignmentPriorityQueue) Swap(i, j int) {
  935. pq[i], pq[j] = pq[j], pq[i]
  936. }
  937. func (pq *assignmentPriorityQueue) Push(x interface{}) {
  938. member := x.(*consumerGroupMember)
  939. *pq = append(*pq, member)
  940. }
  941. func (pq *assignmentPriorityQueue) Pop() interface{} {
  942. old := *pq
  943. n := len(old)
  944. member := old[n-1]
  945. *pq = old[0 : n-1]
  946. return member
  947. }