123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package sarama
- import (
- "math"
- "sort"
- )
- // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
- // It contains an allocation of topic/partitions by memberID in the form of
- // a `memberID -> topic -> partitions` map.
- type BalanceStrategyPlan map[string]map[string][]int32
- // Add assigns a topic with a number partitions to a member.
- func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
- if len(partitions) == 0 {
- return
- }
- if _, ok := p[memberID]; !ok {
- p[memberID] = make(map[string][]int32, 1)
- }
- p[memberID][topic] = append(p[memberID][topic], partitions...)
- }
- // --------------------------------------------------------------------
- // BalanceStrategy is used to balance topics and partitions
- // across members of a consumer group
- type BalanceStrategy interface {
- // Name uniquely identifies the strategy.
- Name() string
- // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
- // and returns a distribution plan.
- Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
- }
- // --------------------------------------------------------------------
- // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
- // Example with one topic T with six partitions (0..5) and two members (M1, M2):
- // M1: {T: [0, 1, 2]}
- // M2: {T: [3, 4, 5]}
- var BalanceStrategyRange = &balanceStrategy{
- name: "range",
- coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
- step := float64(len(partitions)) / float64(len(memberIDs))
- for i, memberID := range memberIDs {
- pos := float64(i)
- min := int(math.Floor(pos*step + 0.5))
- max := int(math.Floor((pos+1)*step + 0.5))
- plan.Add(memberID, topic, partitions[min:max]...)
- }
- },
- }
- // BalanceStrategyRoundRobin assigns partitions to members in alternating order.
- // Example with topic T with six partitions (0..5) and two members (M1, M2):
- // M1: {T: [0, 2, 4]}
- // M2: {T: [1, 3, 5]}
- var BalanceStrategyRoundRobin = &balanceStrategy{
- name: "roundrobin",
- coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
- for i, part := range partitions {
- memberID := memberIDs[i%len(memberIDs)]
- plan.Add(memberID, topic, part)
- }
- },
- }
- // --------------------------------------------------------------------
- type balanceStrategy struct {
- name string
- coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
- }
- // Name implements BalanceStrategy.
- func (s *balanceStrategy) Name() string { return s.name }
- // Plan implements BalanceStrategy.
- func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
- // Build members by topic map
- mbt := make(map[string][]string)
- for memberID, meta := range members {
- for _, topic := range meta.Topics {
- mbt[topic] = append(mbt[topic], memberID)
- }
- }
- // Sort members for each topic
- for topic, memberIDs := range mbt {
- sort.Sort(&balanceStrategySortable{
- topic: topic,
- memberIDs: memberIDs,
- })
- }
- // Assemble plan
- plan := make(BalanceStrategyPlan, len(members))
- for topic, memberIDs := range mbt {
- s.coreFn(plan, memberIDs, topic, topics[topic])
- }
- return plan, nil
- }
- type balanceStrategySortable struct {
- topic string
- memberIDs []string
- }
- func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
- func (p balanceStrategySortable) Swap(i, j int) {
- p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
- }
- func (p balanceStrategySortable) Less(i, j int) bool {
- return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
- }
- func balanceStrategyHashValue(vv ...string) uint32 {
- h := uint32(2166136261)
- for _, s := range vv {
- for _, c := range s {
- h ^= uint32(c)
- h *= 16777619
- }
- }
- return h
- }
|