Browse Source

Make the partition strategy assignment names constants, and compare the active strategy name against the sticky balance strategy name when handling a sync group request to allow for easy extension of the sticky balance strategy

Scott Kidder 5 years ago
parent
commit
78d3678f40
2 changed files with 13 additions and 4 deletions
  1. 12 3
      balance_strategy.go
  2. 1 1
      consumer_group.go

+ 12 - 3
balance_strategy.go

@@ -6,6 +6,15 @@ import (
 )
 
 const (
+	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
+	RangeBalanceStrategyName = "range"
+
+	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
+	RoundRobinBalanceStrategyName = "roundrobin"
+
+	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
+	StickyBalanceStrategyName = "sticky"
+
 	defaultGeneration = -1
 )
 
@@ -45,7 +54,7 @@ type BalanceStrategy interface {
 //   M1: {T: [0, 1, 2]}
 //   M2: {T: [3, 4, 5]}
 var BalanceStrategyRange = &balanceStrategy{
-	name: "range",
+	name: RangeBalanceStrategyName,
 	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
 		step := float64(len(partitions)) / float64(len(memberIDs))
 
@@ -63,7 +72,7 @@ var BalanceStrategyRange = &balanceStrategy{
 //   M1: {T: [0, 2, 4]}
 //   M2: {T: [1, 3, 5]}
 var BalanceStrategyRoundRobin = &balanceStrategy{
-	name: "roundrobin",
+	name: RoundRobinBalanceStrategyName,
 	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
 		for i, part := range partitions {
 			memberID := memberIDs[i%len(memberIDs)]
@@ -150,7 +159,7 @@ type stickyBalanceStrategy struct {
 }
 
 // Name implements BalanceStrategy.
-func (s *stickyBalanceStrategy) Name() string { return "sticky" }
+func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
 
 // Plan implements BalanceStrategy.
 func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {

+ 1 - 1
consumer_group.go

@@ -333,7 +333,7 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
 		assignment := &ConsumerGroupMemberAssignment{Topics: topics}
 
 		// Include topic assignments in group-assignment userdata for each consumer-group member
-		if c.config.Consumer.Group.Rebalance.Strategy == BalanceStrategySticky {
+		if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
 			userDataBytes, err := encode(&StickyAssignorUserDataV1{
 				Topics:     topics,
 				Generation: generationID,