balance_strategy.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. package sarama
  2. import (
  3. "container/heap"
  4. "math"
  5. "sort"
  6. "strings"
  7. )
  8. const (
  9. // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
  10. RangeBalanceStrategyName = "range"
  11. // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
  12. RoundRobinBalanceStrategyName = "roundrobin"
  13. // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
  14. StickyBalanceStrategyName = "sticky"
  15. defaultGeneration = -1
  16. )
  17. // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
  18. // It contains an allocation of topic/partitions by memberID in the form of
  19. // a `memberID -> topic -> partitions` map.
  20. type BalanceStrategyPlan map[string]map[string][]int32
  21. // Add assigns a topic with a number partitions to a member.
  22. func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
  23. if len(partitions) == 0 {
  24. return
  25. }
  26. if _, ok := p[memberID]; !ok {
  27. p[memberID] = make(map[string][]int32, 1)
  28. }
  29. p[memberID][topic] = append(p[memberID][topic], partitions...)
  30. }
  31. // --------------------------------------------------------------------
  32. // BalanceStrategy is used to balance topics and partitions
  33. // across members of a consumer group
  34. type BalanceStrategy interface {
  35. // Name uniquely identifies the strategy.
  36. Name() string
  37. // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
  38. // and returns a distribution plan.
  39. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
  40. }
  41. // --------------------------------------------------------------------
  42. // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
  43. // Example with one topic T with six partitions (0..5) and two members (M1, M2):
  44. // M1: {T: [0, 1, 2]}
  45. // M2: {T: [3, 4, 5]}
  46. var BalanceStrategyRange = &balanceStrategy{
  47. name: RangeBalanceStrategyName,
  48. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  49. step := float64(len(partitions)) / float64(len(memberIDs))
  50. for i, memberID := range memberIDs {
  51. pos := float64(i)
  52. min := int(math.Floor(pos*step + 0.5))
  53. max := int(math.Floor((pos+1)*step + 0.5))
  54. plan.Add(memberID, topic, partitions[min:max]...)
  55. }
  56. },
  57. }
  58. // BalanceStrategyRoundRobin assigns partitions to members in alternating order.
  59. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  60. // M1: {T: [0, 2, 4]}
  61. // M2: {T: [1, 3, 5]}
  62. var BalanceStrategyRoundRobin = &balanceStrategy{
  63. name: RoundRobinBalanceStrategyName,
  64. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  65. for i, part := range partitions {
  66. memberID := memberIDs[i%len(memberIDs)]
  67. plan.Add(memberID, topic, part)
  68. }
  69. },
  70. }
  71. // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
  72. // while maintain a balanced partition distribution.
  73. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  74. // M1: {T: [0, 2, 4]}
  75. // M2: {T: [1, 3, 5]}
  76. //
  77. // On reassignment with an additional consumer, you might get an assignment plan like:
  78. // M1: {T: [0, 2]}
  79. // M2: {T: [1, 3]}
  80. // M3: {T: [4, 5]}
  81. //
  82. var BalanceStrategySticky = &stickyBalanceStrategy{}
  83. // --------------------------------------------------------------------
  84. type balanceStrategy struct {
  85. name string
  86. coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
  87. }
  88. // Name implements BalanceStrategy.
  89. func (s *balanceStrategy) Name() string { return s.name }
  90. // Plan implements BalanceStrategy.
  91. func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  92. // Build members by topic map
  93. mbt := make(map[string][]string)
  94. for memberID, meta := range members {
  95. for _, topic := range meta.Topics {
  96. mbt[topic] = append(mbt[topic], memberID)
  97. }
  98. }
  99. // Sort members for each topic
  100. for topic, memberIDs := range mbt {
  101. sort.Sort(&balanceStrategySortable{
  102. topic: topic,
  103. memberIDs: memberIDs,
  104. })
  105. }
  106. // Assemble plan
  107. plan := make(BalanceStrategyPlan, len(members))
  108. for topic, memberIDs := range mbt {
  109. s.coreFn(plan, memberIDs, topic, topics[topic])
  110. }
  111. return plan, nil
  112. }
  113. type balanceStrategySortable struct {
  114. topic string
  115. memberIDs []string
  116. }
  117. func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
  118. func (p balanceStrategySortable) Swap(i, j int) {
  119. p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
  120. }
  121. func (p balanceStrategySortable) Less(i, j int) bool {
  122. return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
  123. }
  124. func balanceStrategyHashValue(vv ...string) uint32 {
  125. h := uint32(2166136261)
  126. for _, s := range vv {
  127. for _, c := range s {
  128. h ^= uint32(c)
  129. h *= 16777619
  130. }
  131. }
  132. return h
  133. }
  134. type stickyBalanceStrategy struct {
  135. movements partitionMovements
  136. }
  137. // Name implements BalanceStrategy.
  138. func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
  139. // Plan implements BalanceStrategy.
  140. func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  141. // track partition movements during generation of the partition assignment plan
  142. s.movements = partitionMovements{
  143. Movements: make(map[topicPartitionAssignment]consumerPair),
  144. PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
  145. }
  146. // prepopulate the current assignment state from userdata on the consumer group members
  147. currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
  148. if err != nil {
  149. return nil, err
  150. }
  151. // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
  152. isFreshAssignment := false
  153. if len(currentAssignment) == 0 {
  154. isFreshAssignment = true
  155. }
  156. // create a mapping of all current topic partitions and the consumers that can be assigned to them
  157. partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
  158. for topic, partitions := range topics {
  159. for _, partition := range partitions {
  160. partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
  161. }
  162. }
  163. // create a mapping of all consumers to all potential topic partitions that can be assigned to them
  164. // also, populate the mapping of partitions to potential consumers
  165. consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
  166. for memberID, meta := range members {
  167. consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
  168. for _, topicSubscription := range meta.Topics {
  169. // only evaluate topic subscriptions that are present in the supplied topics map
  170. if _, found := topics[topicSubscription]; found {
  171. for _, partition := range topics[topicSubscription] {
  172. topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
  173. consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
  174. partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
  175. }
  176. }
  177. }
  178. // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
  179. if _, exists := currentAssignment[memberID]; !exists {
  180. currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
  181. }
  182. }
  183. // create a mapping of each partition to its current consumer, where possible
  184. currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
  185. unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  186. for partition := range partition2AllPotentialConsumers {
  187. unvisitedPartitions[partition] = true
  188. }
  189. var unassignedPartitions []topicPartitionAssignment
  190. for memberID, partitions := range currentAssignment {
  191. var keepPartitions []topicPartitionAssignment
  192. for _, partition := range partitions {
  193. // If this partition no longer exists at all, likely due to the
  194. // topic being deleted, we remove the partition from the member.
  195. if _, exists := partition2AllPotentialConsumers[partition]; !exists {
  196. continue
  197. }
  198. delete(unvisitedPartitions, partition)
  199. currentPartitionConsumers[partition] = memberID
  200. if !strsContains(members[memberID].Topics, partition.Topic) {
  201. unassignedPartitions = append(unassignedPartitions, partition)
  202. continue
  203. }
  204. keepPartitions = append(keepPartitions, partition)
  205. }
  206. currentAssignment[memberID] = keepPartitions
  207. }
  208. for unvisited := range unvisitedPartitions {
  209. unassignedPartitions = append(unassignedPartitions, unvisited)
  210. }
  211. // sort the topic partitions in order of priority for reassignment
  212. sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
  213. // at this point we have preserved all valid topic partition to consumer assignments and removed
  214. // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
  215. // to consumers so that the topic partition assignments are as balanced as possible.
  216. // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
  217. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  218. s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
  219. // Assemble plan
  220. plan := make(BalanceStrategyPlan, len(currentAssignment))
  221. for memberID, assignments := range currentAssignment {
  222. if len(assignments) == 0 {
  223. plan[memberID] = make(map[string][]int32, 0)
  224. } else {
  225. for _, assignment := range assignments {
  226. plan.Add(memberID, assignment.Topic, assignment.Partition)
  227. }
  228. }
  229. }
  230. return plan, nil
  231. }
  232. func strsContains(s []string, value string) bool {
  233. for _, entry := range s {
  234. if entry == value {
  235. return true
  236. }
  237. }
  238. return false
  239. }
  240. // Balance assignments across consumers for maximum fairness and stickiness.
  241. func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
  242. initializing := false
  243. if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
  244. initializing = true
  245. }
  246. // assign all unassigned partitions
  247. for _, partition := range unassignedPartitions {
  248. // skip if there is no potential consumer for the partition
  249. if len(partition2AllPotentialConsumers[partition]) == 0 {
  250. continue
  251. }
  252. sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
  253. }
  254. // narrow down the reassignment scope to only those partitions that can actually be reassigned
  255. for partition := range partition2AllPotentialConsumers {
  256. if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  257. sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
  258. }
  259. }
  260. // narrow down the reassignment scope to only those consumers that are subject to reassignment
  261. fixedAssignments := make(map[string][]topicPartitionAssignment)
  262. for memberID := range consumer2AllPotentialPartitions {
  263. if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
  264. fixedAssignments[memberID] = currentAssignment[memberID]
  265. delete(currentAssignment, memberID)
  266. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  267. }
  268. }
  269. // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
  270. preBalanceAssignment := deepCopyAssignment(currentAssignment)
  271. preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
  272. for k, v := range currentPartitionConsumer {
  273. preBalancePartitionConsumers[k] = v
  274. }
  275. reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
  276. // if we are not preserving existing assignments and we have made changes to the current assignment
  277. // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
  278. if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
  279. currentAssignment = deepCopyAssignment(preBalanceAssignment)
  280. currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
  281. for k, v := range preBalancePartitionConsumers {
  282. currentPartitionConsumer[k] = v
  283. }
  284. }
  285. // add the fixed assignments (those that could not change) back
  286. for consumer, assignments := range fixedAssignments {
  287. currentAssignment[consumer] = assignments
  288. }
  289. }
  290. // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
  291. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
  292. // Lower balance score indicates a more balanced assignment.
  293. func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
  294. consumer2AssignmentSize := make(map[string]int, len(assignment))
  295. for memberID, partitions := range assignment {
  296. consumer2AssignmentSize[memberID] = len(partitions)
  297. }
  298. var score float64
  299. for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
  300. delete(consumer2AssignmentSize, memberID)
  301. for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
  302. score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
  303. }
  304. }
  305. return int(score)
  306. }
  307. // Determine whether the current assignment plan is balanced.
  308. func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
  309. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  310. min := len(currentAssignment[sortedCurrentSubscriptions[0]])
  311. max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
  312. if min >= max-1 {
  313. // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
  314. return true
  315. }
  316. // create a mapping from partitions to the consumer assigned to them
  317. allPartitions := make(map[topicPartitionAssignment]string)
  318. for memberID, partitions := range currentAssignment {
  319. for _, partition := range partitions {
  320. if _, exists := allPartitions[partition]; exists {
  321. Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
  322. }
  323. allPartitions[partition] = memberID
  324. }
  325. }
  326. // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
  327. // could but did not get cannot be moved to it (because that would break the balance)
  328. for _, memberID := range sortedCurrentSubscriptions {
  329. consumerPartitions := currentAssignment[memberID]
  330. consumerPartitionCount := len(consumerPartitions)
  331. // skip if this consumer already has all the topic partitions it can get
  332. if consumerPartitionCount == len(allSubscriptions[memberID]) {
  333. continue
  334. }
  335. // otherwise make sure it cannot get any more
  336. potentialTopicPartitions := allSubscriptions[memberID]
  337. for _, partition := range potentialTopicPartitions {
  338. if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
  339. otherConsumer := allPartitions[partition]
  340. otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
  341. if consumerPartitionCount < otherConsumerPartitionCount {
  342. return false
  343. }
  344. }
  345. }
  346. }
  347. return true
  348. }
  349. // Reassign all topic partitions that need reassignment until balanced.
  350. func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
  351. reassignmentPerformed := false
  352. modified := false
  353. // repeat reassignment until no partition can be moved to improve the balance
  354. for {
  355. modified = false
  356. // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
  357. // until the full list is processed or a balance is achieved
  358. for _, partition := range reassignablePartitions {
  359. if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
  360. break
  361. }
  362. // the partition must have at least two consumers
  363. if len(partition2AllPotentialConsumers[partition]) <= 1 {
  364. Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
  365. }
  366. // the partition must have a consumer
  367. consumer := currentPartitionConsumer[partition]
  368. if consumer == "" {
  369. Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
  370. }
  371. if _, exists := prevAssignment[partition]; exists {
  372. if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
  373. sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
  374. reassignmentPerformed = true
  375. modified = true
  376. continue
  377. }
  378. }
  379. // check if a better-suited consumer exists for the partition; if so, reassign it
  380. for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
  381. if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
  382. sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
  383. reassignmentPerformed = true
  384. modified = true
  385. break
  386. }
  387. }
  388. }
  389. if !modified {
  390. return reassignmentPerformed
  391. }
  392. }
  393. }
  394. // Identify a new consumer for a topic partition and reassign it.
  395. func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
  396. for _, anotherConsumer := range sortedCurrentSubscriptions {
  397. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
  398. return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
  399. }
  400. }
  401. return sortedCurrentSubscriptions
  402. }
  403. // Reassign a specific partition to a new consumer
  404. func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
  405. consumer := currentPartitionConsumer[partition]
  406. // find the correct partition movement considering the stickiness requirement
  407. partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
  408. return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
  409. }
  410. // Track the movement of a topic partition after assignment
  411. func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  412. oldConsumer := currentPartitionConsumer[partition]
  413. s.movements.movePartition(partition, oldConsumer, newConsumer)
  414. currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
  415. currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
  416. currentPartitionConsumer[partition] = newConsumer
  417. return sortMemberIDsByPartitionAssignments(currentAssignment)
  418. }
  419. // Determine whether a specific consumer should be considered for topic partition assignment.
  420. func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  421. currentPartitions := currentAssignment[memberID]
  422. currentAssignmentSize := len(currentPartitions)
  423. maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
  424. if currentAssignmentSize > maxAssignmentSize {
  425. Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
  426. }
  427. if currentAssignmentSize < maxAssignmentSize {
  428. // if a consumer is not assigned all its potential partitions it is subject to reassignment
  429. return true
  430. }
  431. for _, partition := range currentPartitions {
  432. if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  433. return true
  434. }
  435. }
  436. return false
  437. }
  438. // Only consider reassigning those topic partitions that have two or more potential consumers.
  439. func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  440. return len(partition2AllPotentialConsumers[partition]) >= 2
  441. }
  442. // The assignment should improve the overall balance of the partition assignments to consumers.
  443. func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  444. for _, memberID := range sortedCurrentSubscriptions {
  445. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
  446. currentAssignment[memberID] = append(currentAssignment[memberID], partition)
  447. currentPartitionConsumer[partition] = memberID
  448. break
  449. }
  450. }
  451. return sortMemberIDsByPartitionAssignments(currentAssignment)
  452. }
  453. // Deserialize topic partition assignment data to aid with creation of a sticky assignment.
  454. func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
  455. userDataV1 := &StickyAssignorUserDataV1{}
  456. if err := decode(userDataBytes, userDataV1); err != nil {
  457. userDataV0 := &StickyAssignorUserDataV0{}
  458. if err := decode(userDataBytes, userDataV0); err != nil {
  459. return nil, err
  460. }
  461. return userDataV0, nil
  462. }
  463. return userDataV1, nil
  464. }
  465. // filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
  466. // to those topic partitions currently reported by the Kafka cluster.
  467. func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
  468. assignments := deepCopyAssignment(currentAssignment)
  469. for memberID, partitions := range assignments {
  470. // perform in-place filtering
  471. i := 0
  472. for _, partition := range partitions {
  473. if _, exists := partition2AllPotentialConsumers[partition]; exists {
  474. partitions[i] = partition
  475. i++
  476. }
  477. }
  478. assignments[memberID] = partitions[:i]
  479. }
  480. return assignments
  481. }
  482. func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
  483. for i, assignment := range assignments {
  484. if assignment == topic {
  485. return append(assignments[:i], assignments[i+1:]...)
  486. }
  487. }
  488. return assignments
  489. }
  490. func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
  491. for _, assignment := range assignments {
  492. if assignment == topic {
  493. return true
  494. }
  495. }
  496. return false
  497. }
  498. func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
  499. unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  500. for partition := range partition2AllPotentialConsumers {
  501. unassignedPartitions[partition] = true
  502. }
  503. sortedPartitions := make([]topicPartitionAssignment, 0)
  504. if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
  505. // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
  506. // then we just need to simply list partitions in a round robin fashion (from consumers with
  507. // most assigned partitions to those with least)
  508. assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
  509. // use priority-queue to evaluate consumer group members in descending-order based on
  510. // the number of topic partition assignments (i.e. consumers with most assignments first)
  511. pq := make(assignmentPriorityQueue, len(assignments))
  512. i := 0
  513. for consumerID, consumerAssignments := range assignments {
  514. pq[i] = &consumerGroupMember{
  515. id: consumerID,
  516. assignments: consumerAssignments,
  517. }
  518. i++
  519. }
  520. heap.Init(&pq)
  521. for {
  522. // loop until no consumer-group members remain
  523. if pq.Len() == 0 {
  524. break
  525. }
  526. member := pq[0]
  527. // partitions that were assigned to a different consumer last time
  528. var prevPartitionIndex int
  529. for i, partition := range member.assignments {
  530. if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
  531. prevPartitionIndex = i
  532. break
  533. }
  534. }
  535. if len(member.assignments) > 0 {
  536. partition := member.assignments[prevPartitionIndex]
  537. sortedPartitions = append(sortedPartitions, partition)
  538. delete(unassignedPartitions, partition)
  539. if prevPartitionIndex == 0 {
  540. member.assignments = member.assignments[1:]
  541. } else {
  542. member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
  543. }
  544. heap.Fix(&pq, 0)
  545. } else {
  546. heap.Pop(&pq)
  547. }
  548. }
  549. for partition := range unassignedPartitions {
  550. sortedPartitions = append(sortedPartitions, partition)
  551. }
  552. } else {
  553. // an ascending sorted set of topic partitions based on how many consumers can potentially use them
  554. sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
  555. }
  556. return sortedPartitions
  557. }
  558. func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
  559. // sort the members by the number of partition assignments in ascending order
  560. sortedMemberIDs := make([]string, 0, len(assignments))
  561. for memberID := range assignments {
  562. sortedMemberIDs = append(sortedMemberIDs, memberID)
  563. }
  564. sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
  565. ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
  566. if ret == 0 {
  567. return sortedMemberIDs[i] < sortedMemberIDs[j]
  568. }
  569. return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
  570. })
  571. return sortedMemberIDs
  572. }
  573. func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
  574. // sort the members by the number of partition assignments in descending order
  575. sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
  576. i := 0
  577. for partition := range partition2AllPotentialConsumers {
  578. sortedPartionIDs[i] = partition
  579. i++
  580. }
  581. sort.Slice(sortedPartionIDs, func(i, j int) bool {
  582. if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
  583. ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
  584. if ret == 0 {
  585. return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
  586. }
  587. return ret < 0
  588. }
  589. return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
  590. })
  591. return sortedPartionIDs
  592. }
  593. func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
  594. dst := make([]topicPartitionAssignment, len(src))
  595. for i, partition := range src {
  596. dst[i] = partition
  597. }
  598. return dst
  599. }
  600. func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
  601. copy := make(map[string][]topicPartitionAssignment, len(assignment))
  602. for memberID, subscriptions := range assignment {
  603. copy[memberID] = append(subscriptions[:0:0], subscriptions...)
  604. }
  605. return copy
  606. }
  607. func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
  608. curMembers := make(map[string]int)
  609. for _, cur := range partition2AllPotentialConsumers {
  610. if len(curMembers) == 0 {
  611. for _, curMembersElem := range cur {
  612. curMembers[curMembersElem]++
  613. }
  614. continue
  615. }
  616. if len(curMembers) != len(cur) {
  617. return false
  618. }
  619. yMap := make(map[string]int)
  620. for _, yElem := range cur {
  621. yMap[yElem]++
  622. }
  623. for curMembersMapKey, curMembersMapVal := range curMembers {
  624. if yMap[curMembersMapKey] != curMembersMapVal {
  625. return false
  626. }
  627. }
  628. }
  629. curPartitions := make(map[topicPartitionAssignment]int)
  630. for _, cur := range consumer2AllPotentialPartitions {
  631. if len(curPartitions) == 0 {
  632. for _, curPartitionElem := range cur {
  633. curPartitions[curPartitionElem]++
  634. }
  635. continue
  636. }
  637. if len(curPartitions) != len(cur) {
  638. return false
  639. }
  640. yMap := make(map[topicPartitionAssignment]int)
  641. for _, yElem := range cur {
  642. yMap[yElem]++
  643. }
  644. for curMembersMapKey, curMembersMapVal := range curPartitions {
  645. if yMap[curMembersMapKey] != curMembersMapVal {
  646. return false
  647. }
  648. }
  649. }
  650. return true
  651. }
  652. // We need to process subscriptions' user data with each consumer's reported generation in mind
  653. // higher generations overwrite lower generations in case of a conflict
  654. // note that a conflict could exist only if user data is for different generations
  655. func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
  656. currentAssignment := make(map[string][]topicPartitionAssignment)
  657. prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
  658. // for each partition we create a sorted map of its consumers by generation
  659. sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
  660. for memberID, meta := range members {
  661. consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
  662. if err != nil {
  663. return nil, nil, err
  664. }
  665. for _, partition := range consumerUserData.partitions() {
  666. if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
  667. if consumerUserData.hasGeneration() {
  668. if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
  669. // same partition is assigned to two consumers during the same rebalance.
  670. // log a warning and skip this record
  671. Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
  672. continue
  673. } else {
  674. consumers[consumerUserData.generation()] = memberID
  675. }
  676. } else {
  677. consumers[defaultGeneration] = memberID
  678. }
  679. } else {
  680. generation := defaultGeneration
  681. if consumerUserData.hasGeneration() {
  682. generation = consumerUserData.generation()
  683. }
  684. sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
  685. }
  686. }
  687. }
  688. // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
  689. // current and previous consumers are the last two consumers of each partition in the above sorted map
  690. for partition, consumers := range sortedPartitionConsumersByGeneration {
  691. // sort consumers by generation in decreasing order
  692. var generations []int
  693. for generation := range consumers {
  694. generations = append(generations, generation)
  695. }
  696. sort.Sort(sort.Reverse(sort.IntSlice(generations)))
  697. consumer := consumers[generations[0]]
  698. if _, exists := currentAssignment[consumer]; !exists {
  699. currentAssignment[consumer] = []topicPartitionAssignment{partition}
  700. } else {
  701. currentAssignment[consumer] = append(currentAssignment[consumer], partition)
  702. }
  703. // check for previous assignment, if any
  704. if len(generations) > 1 {
  705. prevAssignment[partition] = consumerGenerationPair{
  706. MemberID: consumers[generations[1]],
  707. Generation: generations[1],
  708. }
  709. }
  710. }
  711. return currentAssignment, prevAssignment, nil
  712. }
  713. type consumerGenerationPair struct {
  714. MemberID string
  715. Generation int
  716. }
  717. // consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
  718. type consumerPair struct {
  719. SrcMemberID string
  720. DstMemberID string
  721. }
  722. // partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
  723. type partitionMovements struct {
  724. PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
  725. Movements map[topicPartitionAssignment]consumerPair
  726. }
  727. func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
  728. pair := p.Movements[partition]
  729. delete(p.Movements, partition)
  730. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  731. delete(partitionMovementsForThisTopic[pair], partition)
  732. if len(partitionMovementsForThisTopic[pair]) == 0 {
  733. delete(partitionMovementsForThisTopic, pair)
  734. }
  735. if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
  736. delete(p.PartitionMovementsByTopic, partition.Topic)
  737. }
  738. return pair
  739. }
  740. func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
  741. p.Movements[partition] = pair
  742. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  743. p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
  744. }
  745. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  746. if _, exists := partitionMovementsForThisTopic[pair]; !exists {
  747. partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
  748. }
  749. partitionMovementsForThisTopic[pair][partition] = true
  750. }
  751. func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
  752. pair := consumerPair{
  753. SrcMemberID: oldConsumer,
  754. DstMemberID: newConsumer,
  755. }
  756. if _, exists := p.Movements[partition]; exists {
  757. // this partition has previously moved
  758. existingPair := p.removeMovementRecordOfPartition(partition)
  759. if existingPair.DstMemberID != oldConsumer {
  760. Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
  761. }
  762. if existingPair.SrcMemberID != newConsumer {
  763. // the partition is not moving back to its previous consumer
  764. p.addPartitionMovementRecord(partition, consumerPair{
  765. SrcMemberID: existingPair.SrcMemberID,
  766. DstMemberID: newConsumer,
  767. })
  768. }
  769. } else {
  770. p.addPartitionMovementRecord(partition, pair)
  771. }
  772. }
  773. func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
  774. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  775. return partition
  776. }
  777. if _, exists := p.Movements[partition]; exists {
  778. // this partition has previously moved
  779. if oldConsumer != p.Movements[partition].DstMemberID {
  780. Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
  781. }
  782. oldConsumer = p.Movements[partition].SrcMemberID
  783. }
  784. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  785. reversePair := consumerPair{
  786. SrcMemberID: newConsumer,
  787. DstMemberID: oldConsumer,
  788. }
  789. if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
  790. return partition
  791. }
  792. var reversePairPartition topicPartitionAssignment
  793. for otherPartition := range partitionMovementsForThisTopic[reversePair] {
  794. reversePairPartition = otherPartition
  795. }
  796. return reversePairPartition
  797. }
  798. func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
  799. if src == dst {
  800. return currentPath, false
  801. }
  802. if len(pairs) == 0 {
  803. return currentPath, false
  804. }
  805. for _, pair := range pairs {
  806. if src == pair.SrcMemberID && dst == pair.DstMemberID {
  807. currentPath = append(currentPath, src, dst)
  808. return currentPath, true
  809. }
  810. }
  811. for _, pair := range pairs {
  812. if pair.SrcMemberID == src {
  813. // create a deep copy of the pairs, excluding the current pair
  814. reducedSet := make([]consumerPair, len(pairs)-1)
  815. i := 0
  816. for _, p := range pairs {
  817. if p != pair {
  818. reducedSet[i] = pair
  819. i++
  820. }
  821. }
  822. currentPath = append(currentPath, pair.SrcMemberID)
  823. return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
  824. }
  825. }
  826. return currentPath, false
  827. }
  828. func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
  829. superCycle := make([]string, len(cycle)-1)
  830. for i := 0; i < len(cycle)-1; i++ {
  831. superCycle[i] = cycle[i]
  832. }
  833. for _, c := range cycle {
  834. superCycle = append(superCycle, c)
  835. }
  836. for _, foundCycle := range cycles {
  837. if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
  838. return true
  839. }
  840. }
  841. return false
  842. }
  843. func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
  844. cycles := make([][]string, 0)
  845. for _, pair := range pairs {
  846. // create a deep copy of the pairs, excluding the current pair
  847. reducedPairs := make([]consumerPair, len(pairs)-1)
  848. i := 0
  849. for _, p := range pairs {
  850. if p != pair {
  851. reducedPairs[i] = pair
  852. i++
  853. }
  854. }
  855. if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
  856. if !p.in(path, cycles) {
  857. cycles = append(cycles, path)
  858. Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
  859. }
  860. }
  861. }
  862. // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
  863. // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
  864. // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
  865. for _, cycle := range cycles {
  866. if len(cycle) == 3 {
  867. return true
  868. }
  869. }
  870. return false
  871. }
  872. func (p *partitionMovements) isSticky() bool {
  873. for topic, movements := range p.PartitionMovementsByTopic {
  874. movementPairs := make([]consumerPair, len(movements))
  875. i := 0
  876. for pair := range movements {
  877. movementPairs[i] = pair
  878. i++
  879. }
  880. if p.hasCycles(movementPairs) {
  881. Logger.Printf("Stickiness is violated for topic %s", topic)
  882. Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
  883. return false
  884. }
  885. }
  886. return true
  887. }
  888. func indexOfSubList(source []string, target []string) int {
  889. targetSize := len(target)
  890. maxCandidate := len(source) - targetSize
  891. nextCand:
  892. for candidate := 0; candidate <= maxCandidate; candidate++ {
  893. j := candidate
  894. for i := 0; i < targetSize; i++ {
  895. if target[i] != source[j] {
  896. // Element mismatch, try next cand
  897. continue nextCand
  898. }
  899. j++
  900. }
  901. // All elements of candidate matched target
  902. return candidate
  903. }
  904. return -1
  905. }
  906. type consumerGroupMember struct {
  907. id string
  908. assignments []topicPartitionAssignment
  909. }
  910. // assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
  911. // in descending order (most assignments to least assignments).
  912. type assignmentPriorityQueue []*consumerGroupMember
  913. func (pq assignmentPriorityQueue) Len() int { return len(pq) }
  914. func (pq assignmentPriorityQueue) Less(i, j int) bool {
  915. // order asssignment priority queue in descending order using assignment-count/member-id
  916. if len(pq[i].assignments) == len(pq[j].assignments) {
  917. return strings.Compare(pq[i].id, pq[j].id) > 0
  918. }
  919. return len(pq[i].assignments) > len(pq[j].assignments)
  920. }
  921. func (pq assignmentPriorityQueue) Swap(i, j int) {
  922. pq[i], pq[j] = pq[j], pq[i]
  923. }
  924. func (pq *assignmentPriorityQueue) Push(x interface{}) {
  925. member := x.(*consumerGroupMember)
  926. *pq = append(*pq, member)
  927. }
  928. func (pq *assignmentPriorityQueue) Pop() interface{} {
  929. old := *pq
  930. n := len(old)
  931. member := old[n-1]
  932. *pq = old[0 : n-1]
  933. return member
  934. }