Browse Source

Address review comments

Al DeLucca 5 years ago
parent
commit
46f804e149
3 changed files with 70 additions and 8 deletions
  1. 3 7
      balance_strategy.go
  2. 66 0
      balance_strategy_test.go
  3. 1 1
      consumer_group.go

+ 3 - 7
balance_strategy.go

@@ -50,7 +50,7 @@ type BalanceStrategy interface {
 
 	// AssignmentData returns the serialized assignment data for the specified
 	// memberID
-	AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error)
+	AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
 }
 
 // --------------------------------------------------------------------
@@ -137,7 +137,7 @@ func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, t
 }
 
 // AssignmentData simple strategies do not require any shared assignment data
-func (s *balanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
+func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
 	return nil, nil
 }
 
@@ -279,11 +279,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
 
 // AssignmentData serializes the set of topics currently assigned to the
 // specified member as part of the supplied balance plan
-func (s *stickyBalanceStrategy) AssignmentData(plan BalanceStrategyPlan, memberID string, generationID int32) ([]byte, error) {
-	topics, ok := plan[memberID]
-	if !ok {
-		return nil, nil
-	}
+func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
 	return encode(&StickyAssignorUserDataV1{
 		Topics:     topics,
 		Generation: generationID,

+ 66 - 0
balance_strategy_test.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"bytes"
 	"fmt"
 	"math"
 	"math/rand"
@@ -62,6 +63,27 @@ func TestBalanceStrategyRange(t *testing.T) {
 	}
 }
 
+func TestBalanceStrategyRangeAssignmentData(t *testing.T) {
+
+	strategy := BalanceStrategyRange
+
+	members := make(map[string]ConsumerGroupMemberMetadata, 2)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+
+	actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
+	if err != nil {
+		t.Errorf("Error building assignment data: %v", err)
+	}
+	if actual != nil {
+		t.Error("Invalid assignment data returned from AssignmentData")
+	}
+}
+
 func TestBalanceStrategyRoundRobin(t *testing.T) {
 	tests := []struct {
 		members  map[string][]string
@@ -191,6 +213,27 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) {
 	}
 }
 
+func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
+
+	strategy := BalanceStrategyRoundRobin
+
+	members := make(map[string]ConsumerGroupMemberMetadata, 2)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+
+	actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
+	if err != nil {
+		t.Errorf("Error building assignment data: %v", err)
+	}
+	if actual != nil {
+		t.Error("Invalid assignment data returned from AssignmentData")
+	}
+}
+
 func Test_prepopulateCurrentAssignments(t *testing.T) {
 	type args struct {
 		members map[string]ConsumerGroupMemberMetadata
@@ -1950,6 +1993,29 @@ func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T
 	verifyFullyBalanced(t, plan)
 }
 
+func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) {
+
+	s := &stickyBalanceStrategy{}
+
+	members := make(map[string]ConsumerGroupMemberMetadata, 2)
+	members["consumer1"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+	members["consumer2"] = ConsumerGroupMemberMetadata{
+		Topics: []string{"topic1"},
+	}
+
+	expected := encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1)
+
+	actual, err := s.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
+	if err != nil {
+		t.Errorf("Error building assignment data: %v", err)
+	}
+	if bytes.Compare(expected, actual) != 0 {
+		t.Error("Invalid assignment data returned from AssignmentData")
+	}
+}
+
 func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
 	s := &stickyBalanceStrategy{}
 	r := rand.New(rand.NewSource(time.Now().UnixNano()))

+ 1 - 1
consumer_group.go

@@ -334,7 +334,7 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
 	strategy := c.config.Consumer.Group.Rebalance.Strategy
 	for memberID, topics := range plan {
 		assignment := &ConsumerGroupMemberAssignment{Topics: topics}
-		userDataBytes, err := strategy.AssignmentData(plan, memberID, generationID)
+		userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
 		if err != nil {
 			return nil, err
 		}