Browse Source

Allow BalanceStrategy to provide custom assignment data

The StickyBalanceStrategy currently provides state information with member assignments. This is achieved through a custom name check in the `syncGroupRequest` of the consumer group. This works well for this case, but falls apart when trying to create additional stateful balance strategies. This update will make it possible to create new stateful balance strategies
Al DeLucca 5 years ago
parent
commit
bf1b8fb51e
2 changed files with 27 additions and 11 deletions
  1. 22 0
      balance_strategy.go
  2. 5 11
      consumer_group.go

+ 22 - 0
balance_strategy.go

@@ -47,6 +47,10 @@ type BalanceStrategy interface {
 	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
 	// and returns a distribution plan.
 	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
+
+	// Assignment data returns the serialized assignment data for the specified
+	// memberID
+	AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error)
 }
 
 // --------------------------------------------------------------------
@@ -132,6 +136,11 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
 	return plan, nil
 }
 
+// AssignmentData simple strategies do not require any shared assignment data
+func (s *balanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
+	return nil, nil
+}
+
 type balanceStrategySortable struct {
 	topic     string
 	memberIDs []string
@@ -268,6 +277,19 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
 	return plan, nil
 }
 
+// AssignmentData serializes the set of topics currently assigned to the
+// specified member as part of the supplied balance plan
+func (s *stickyBalanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
+	topics, ok := plan[memberID]
+	if !ok {
+		return nil, nil
+	}
+	return encode(&StickyAssignorUserDataV1{
+		Topics:     topics,
+		Generation: generationID,
+	}, nil)
+}
+
 func strsContains(s []string, value string) bool {
 	for _, entry := range s {
 		if entry == value {

+ 5 - 11
consumer_group.go

@@ -331,20 +331,14 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
 		MemberId:     c.memberID,
 		GenerationId: generationID,
 	}
+	strategy := c.config.Consumer.Group.Rebalance.Strategy
 	for memberID, topics := range plan {
 		assignment := &ConsumerGroupMemberAssignment{Topics: topics}
-
-		// Include topic assignments in group-assignment userdata for each consumer-group member
-		if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
-			userDataBytes, err := encode(&StickyAssignorUserDataV1{
-				Topics:     topics,
-				Generation: generationID,
-			}, nil)
-			if err != nil {
-				return nil, err
-			}
-			assignment.UserData = userDataBytes
+		userDataBytes, err := strategy.AssignmentData(plan, memberID, generationID)
+		if err != nil {
+			return nil, err
 		}
+		assignment.UserData = userDataBytes
 		if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
 			return nil, err
 		}