balance_strategy.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package sarama
  2. import (
  3. "math"
  4. "sort"
  5. )
  6. // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
  7. // It contains an allocation of topic/partitions by memberID in the form of
  8. // a `memberID -> topic -> partitions` map.
  9. type BalanceStrategyPlan map[string]map[string][]int32
  10. // Add assigns a topic with a number partitions to a member.
  11. func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
  12. if len(partitions) == 0 {
  13. return
  14. }
  15. if _, ok := p[memberID]; !ok {
  16. p[memberID] = make(map[string][]int32, 1)
  17. }
  18. p[memberID][topic] = append(p[memberID][topic], partitions...)
  19. }
  20. // --------------------------------------------------------------------
  21. // BalanceStrategy is used to balance topics and partitions
  22. // across memebers of a consumer group
  23. type BalanceStrategy interface {
  24. // Name uniquely identifies the strategy.
  25. Name() string
  26. // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
  27. // and returns a distribution plan.
  28. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
  29. }
  30. // --------------------------------------------------------------------
  31. // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
  32. // Example with one topic T with six partitions (0..5) and two members (M1, M2):
  33. // M1: {T: [0, 1, 2]}
  34. // M2: {T: [3, 4, 5]}
  35. var BalanceStrategyRange = &balanceStrategy{
  36. name: "range",
  37. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  38. step := float64(len(partitions)) / float64(len(memberIDs))
  39. for i, memberID := range memberIDs {
  40. pos := float64(i)
  41. min := int(math.Floor(pos*step + 0.5))
  42. max := int(math.Floor((pos+1)*step + 0.5))
  43. plan.Add(memberID, topic, partitions[min:max]...)
  44. }
  45. },
  46. }
  47. // BalanceStrategyRoundRobin assigns partitions to members in alternating order.
  48. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  49. // M1: {T: [0, 2, 4]}
  50. // M2: {T: [1, 3, 5]}
  51. var BalanceStrategyRoundRobin = &balanceStrategy{
  52. name: "roundrobin",
  53. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  54. for i, part := range partitions {
  55. memberID := memberIDs[i%len(memberIDs)]
  56. plan.Add(memberID, topic, part)
  57. }
  58. },
  59. }
  60. // --------------------------------------------------------------------
  61. type balanceStrategy struct {
  62. name string
  63. coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
  64. }
  65. // Name implements BalanceStrategy.
  66. func (s *balanceStrategy) Name() string { return s.name }
  67. // Balance implements BalanceStrategy.
  68. func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  69. // Build members by topic map
  70. mbt := make(map[string][]string)
  71. for memberID, meta := range members {
  72. for _, topic := range meta.Topics {
  73. mbt[topic] = append(mbt[topic], memberID)
  74. }
  75. }
  76. // Sort members for each topic
  77. for topic, memberIDs := range mbt {
  78. sort.Sort(&balanceStrategySortable{
  79. topic: topic,
  80. memberIDs: memberIDs,
  81. })
  82. }
  83. // Assemble plan
  84. plan := make(BalanceStrategyPlan, len(members))
  85. for topic, memberIDs := range mbt {
  86. s.coreFn(plan, memberIDs, topic, topics[topic])
  87. }
  88. return plan, nil
  89. }
  90. type balanceStrategySortable struct {
  91. topic string
  92. memberIDs []string
  93. }
  94. func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
  95. func (p balanceStrategySortable) Swap(i, j int) {
  96. p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
  97. }
  98. func (p balanceStrategySortable) Less(i, j int) bool {
  99. return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
  100. }
  101. func balanceStrategyHashValue(vv ...string) uint32 {
  102. h := uint32(2166136261)
  103. for _, s := range vv {
  104. for _, c := range s {
  105. h ^= uint32(c)
  106. h *= 16777619
  107. }
  108. }
  109. return h
  110. }