package sarama

import (
	"bytes"
	"fmt"
	"math"
	"math/rand"
	"reflect"
	"sort"
	"testing"
	"time"
)

func TestBalanceStrategyRange(t *testing.T) {
	tests := []struct {
		members  map[string][]string
		topics   map[string][]int32
		expected BalanceStrategyPlan
	}{
		{
			members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
			topics:  map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
			expected: BalanceStrategyPlan{
				"M1": map[string][]int32{"T1": {0, 1}, "T2": {2, 3}},
				"M2": map[string][]int32{"T1": {2, 3}, "T2": {0, 1}},
			},
		},
		{
			members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
			topics:  map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
			expected: BalanceStrategyPlan{
				"M1": map[string][]int32{"T1": {0, 1}, "T2": {2}},
				"M2": map[string][]int32{"T1": {2}, "T2": {0, 1}},
			},
		},
		{
			members: map[string][]string{"M1": {"T1"}, "M2": {"T1", "T2"}},
			topics:  map[string][]int32{"T1": {0, 1}, "T2": {0, 1}},
			expected: BalanceStrategyPlan{
				"M1": map[string][]int32{"T1": {0}},
				"M2": map[string][]int32{"T1": {1}, "T2": {0, 1}},
			},
		},
	}

	strategy := BalanceStrategyRange
	if strategy.Name() != "range" {
		t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
	}

	for _, test := range tests {
		members := make(map[string]ConsumerGroupMemberMetadata)
		for memberID, topics := range test.members {
			members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
		}

		actual, err := strategy.Plan(members, test.topics)
		if err != nil {
			t.Errorf("Unexpected error %v", err)
		} else if !reflect.DeepEqual(actual, test.expected) {
			t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
		}
	}
}

func TestBalanceStrategyRangeAssignmentData(t *testing.T) {
	strategy := BalanceStrategyRange

	members := make(map[string]ConsumerGroupMemberMetadata, 2)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}

	actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
	if err != nil {
		t.Errorf("Error building assignment data: %v", err)
	}
	if actual != nil {
		t.Error("Invalid assignment data returned from AssignmentData")
	}
}

func TestBalanceStrategyRoundRobin(t *testing.T) {
	tests := []struct {
		members  map[string][]string
		topics   map[string][]int32
		expected BalanceStrategyPlan
	}{
		{
			members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
			topics:  map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
			expected: BalanceStrategyPlan{
				"M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
				"M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
			},
		},
		{
			members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
			topics:  map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
			expected: BalanceStrategyPlan{
				"M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
				"M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
			},
		},
	}

	strategy := BalanceStrategyRoundRobin
	if strategy.Name() != "roundrobin" {
		t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
	}

	for _, test := range tests {
		members := make(map[string]ConsumerGroupMemberMetadata)
		for memberID, topics := range test.members {
			members[memberID] = ConsumerGroupMemberMetadata{Topics: topics}
		}

		actual, err := strategy.Plan(members, test.topics)
		if err != nil {
			t.Errorf("Unexpected error %v", err)
		} else if !reflect.DeepEqual(actual, test.expected) {
			t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual)
		}
	}
}

func Test_deserializeTopicPartitionAssignment(t *testing.T) {
	type args struct {
		userDataBytes []byte
	}
	tests := []struct {
		name    string
		args    args
		want    StickyAssignorUserData
		wantErr bool
	}{
		{
			name: "Nil userdata bytes",
			args: args{},
			want: &StickyAssignorUserDataV1{},
		},
		{
			name: "Non-empty invalid userdata bytes",
			args: args{
				userDataBytes: []byte{
					0x00, 0x00,
					0x00, 0x00, 0x00, 0x01,
					0x00, 0x03, 'f', 'o', 'o',
				},
			},
			wantErr: true,
		},
		{
			name: "Valid v0 userdata bytes",
			args: args{
				userDataBytes: []byte{
					0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
					0x33, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
					0x05,
				},
			},
			want: &StickyAssignorUserDataV0{
				Topics: map[string][]int32{"t03": {5}},
				topicPartitions: []topicPartitionAssignment{
					{
						Topic:     "t03",
						Partition: 5,
					},
				},
			},
		},
		{
			name: "Valid v1 userdata bytes",
			args: args{
				userDataBytes: []byte{
					0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
					0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
					0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
					0xff,
				},
			},
			want: &StickyAssignorUserDataV1{
				Topics:     map[string][]int32{"t06": {0, 4}},
				Generation: -1,
				topicPartitions: []topicPartitionAssignment{
					{
						Topic:     "t06",
						Partition: 0,
					},
					{
						Topic:     "t06",
						Partition: 4,
					},
				},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
			if (err != nil) != tt.wantErr {
				t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
				return
			}
			if !reflect.DeepEqual(got, tt.want) {
				t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
			}
		})
	}
}

func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
	strategy := BalanceStrategyRoundRobin

	members := make(map[string]ConsumerGroupMemberMetadata, 2)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}

	actual, err := strategy.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
	if err != nil {
		t.Errorf("Error building assignment data: %v", err)
	}
	if actual != nil {
		t.Error("Invalid assignment data returned from AssignmentData")
	}
}

func Test_prepopulateCurrentAssignments(t *testing.T) {
	type args struct {
		members map[string]ConsumerGroupMemberMetadata
	}
	tests := []struct {
		name                   string
		args                   args
		wantCurrentAssignments map[string][]topicPartitionAssignment
		wantPrevAssignments    map[topicPartitionAssignment]consumerGenerationPair
		wantErr                bool
	}{
		{
			name:                   "Empty map",
			wantCurrentAssignments: map[string][]topicPartitionAssignment{},
			wantPrevAssignments:    map[topicPartitionAssignment]consumerGenerationPair{},
		},
		{
			name: "Single consumer",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"c01": {
						Version: 2,
						UserData: []byte{
							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
							0xff,
						},
					},
				},
			},
			wantCurrentAssignments: map[string][]topicPartitionAssignment{
				"c01": {
					{
						Topic:     "t06",
						Partition: 0,
					},
					{
						Topic:     "t06",
						Partition: 4,
					},
				},
			},
			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
		},
		{
			name: "Duplicate consumer assignments in metadata",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"c01": {
						Version: 2,
						UserData: []byte{
							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
							0xff,
						},
					},
					"c02": {
						Version: 2,
						UserData: []byte{
							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
							0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff,
							0xff,
						},
					},
				},
			},
			wantCurrentAssignments: map[string][]topicPartitionAssignment{
				"c01": {
					{
						Topic:     "t06",
						Partition: 0,
					},
					{
						Topic:     "t06",
						Partition: 4,
					},
				},
			},
			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{},
		},
		{
			name: "Different generations (5, 6) of consumer assignments in metadata",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"c01": {
						Version: 2,
						UserData: []byte{
							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
							0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
							0x05,
						},
					},
					"c02": {
						Version: 2,
						UserData: []byte{
							0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x74, 0x30,
							0x36, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
							0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00,
							0x06,
						},
					},
				},
			},
			wantCurrentAssignments: map[string][]topicPartitionAssignment{
				"c01": {
					{
						Topic:     "t06",
						Partition: 0,
					},
					{
						Topic:     "t06",
						Partition: 4,
					},
				},
			},
			wantPrevAssignments: map[topicPartitionAssignment]consumerGenerationPair{
				{
					Topic:     "t06",
					Partition: 0,
				}: {
					Generation: 5,
					MemberID:   "c01",
				},
				{
					Topic:     "t06",
					Partition: 4,
				}: {
					Generation: 5,
					MemberID:   "c01",
				},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			_, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)

			if (err != nil) != tt.wantErr {
				t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
			}

			if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
				t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
			}
		})
	}
}

func Test_areSubscriptionsIdentical(t *testing.T) {
	type args struct {
		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
	}
	tests := []struct {
		name string
		args args
		want bool
	}{
		{
			name: "Empty consumers and partitions",
			args: args{
				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
			},
			want: true,
		},
		{
			name: "Topic partitions with identical consumer entries",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 2}: {"c1", "c2", "c3"},
				},
				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
			},
			want: true,
		},
		{
			name: "Topic partitions with mixed up consumer entries",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
				},
				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
			},
			want: true,
		},
		{
			name: "Topic partitions with different consumer entries",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"cX", "c1", "c2"},
				},
				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
			},
			want: false,
		},
		{
			name: "Topic partitions with different number of consumer entries",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c1", "c2"},
				},
				consumer2AllPotentialPartitions: make(map[string][]topicPartitionAssignment),
			},
			want: false,
		},
		{
			name: "Consumers with identical topic partitions",
			args: args{
				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
				},
			},
			want: true,
		},
		{
			name: "Consumer2 with mixed up consumer entries",
			args: args{
				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
					"c3": {{Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
				},
			},
			want: true,
		},
		{
			name: "Consumer2 with different consumer entries",
			args: args{
				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
					"c3": {{Topic: "tX", Partition: 2}, {Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
				},
			},
			want: false,
		},
		{
			name: "Consumer2 with different number of consumer entries",
			args: args{
				partition2AllPotentialConsumers: make(map[topicPartitionAssignment][]string),
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}, {Topic: "t1", Partition: 0}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}},
				},
			},
			want: false,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want {
				t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_sortMemberIDsByPartitionAssignments(t *testing.T) {
	type args struct {
		assignments map[string][]topicPartitionAssignment
	}
	tests := []struct {
		name string
		args args
		want []string
	}{
		{
			name: "Null assignments",
			want: make([]string, 0),
		},
		{
			name: "Single assignment",
			args: args{
				assignments: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
						{Topic: "t1", Partition: 2},
					},
				},
			},
			want: []string{"c1"},
		},
		{
			name: "Multiple assignments with different partition counts",
			args: args{
				assignments: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
					},
					"c2": {
						{Topic: "t1", Partition: 1},
						{Topic: "t1", Partition: 2},
					},
					"c3": {
						{Topic: "t1", Partition: 3},
						{Topic: "t1", Partition: 4},
						{Topic: "t1", Partition: 5},
					},
				},
			},
			want: []string{"c1", "c2", "c3"},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) {
				t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_sortPartitions(t *testing.T) {
	type args struct {
		currentAssignment                          map[string][]topicPartitionAssignment
		partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair
		isFreshAssignment                          bool
		partition2AllPotentialConsumers            map[topicPartitionAssignment][]string
		consumer2AllPotentialPartitions            map[string][]topicPartitionAssignment
	}
	tests := []struct {
		name string
		args args
		want []topicPartitionAssignment
	}{
		{
			name: "Empty everything",
			want: make([]topicPartitionAssignment, 0),
		},
		{
			name: "Base case",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
					"c2": {{Topic: "t1", Partition: 1}},
					"c3": {{Topic: "t1", Partition: 2}},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
				},
			},
		},
		{
			name: "Partitions assigned to a different consumer last time",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
				},
				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
					{Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
				},
			},
		},
		{
			name: "Partitions assigned to a different consumer last time",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
					"c2": {{Topic: "t1", Partition: 1}},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
				},
				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
					{Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
				},
			},
		},
		{
			name: "Fresh assignment",
			args: args{
				isFreshAssignment: true,
				currentAssignment: map[string][]topicPartitionAssignment{},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c2": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
					"c3": {{Topic: "t1", Partition: 0}, {Topic: "t1", Partition: 1}, {Topic: "t1", Partition: 2}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2", "c3"},
					{Topic: "t1", Partition: 1}: {"c2", "c3", "c1"},
					{Topic: "t1", Partition: 2}: {"c3", "c1", "c2"},
				},
				partitionsWithADifferentPreviousAssignment: map[topicPartitionAssignment]consumerGenerationPair{
					{Topic: "t1", Partition: 0}: {Generation: 1, MemberID: "c2"},
				},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions)
			if tt.want != nil && !reflect.DeepEqual(got, tt.want) {
				t.Errorf("sortPartitions() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_filterAssignedPartitions(t *testing.T) {
	type args struct {
		currentAssignment               map[string][]topicPartitionAssignment
		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
	}
	tests := []struct {
		name string
		args args
		want map[string][]topicPartitionAssignment
	}{
		{
			name: "All partitions accounted for",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
					"c2": {{Topic: "t1", Partition: 1}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1"},
					{Topic: "t1", Partition: 1}: {"c2"},
				},
			},
			want: map[string][]topicPartitionAssignment{
				"c1": {{Topic: "t1", Partition: 0}},
				"c2": {{Topic: "t1", Partition: 1}},
			},
		},
		{
			name: "One consumer using an unrecognized partition",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
					"c2": {{Topic: "t1", Partition: 1}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1"},
				},
			},
			want: map[string][]topicPartitionAssignment{
				"c1": {{Topic: "t1", Partition: 0}},
				"c2": {},
			},
		},
		{
			name: "Interleaved consumer removal",
			args: args{
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {{Topic: "t1", Partition: 0}},
					"c2": {{Topic: "t1", Partition: 1}},
					"c3": {{Topic: "t1", Partition: 2}},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1"},
					{Topic: "t1", Partition: 2}: {"c3"},
				},
			},
			want: map[string][]topicPartitionAssignment{
				"c1": {{Topic: "t1", Partition: 0}},
				"c2": {},
				"c3": {{Topic: "t1", Partition: 2}},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
				t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_canConsumerParticipateInReassignment(t *testing.T) {
	type args struct {
		memberID                        string
		currentAssignment               map[string][]topicPartitionAssignment
		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
	}
	tests := []struct {
		name string
		args args
		want bool
	}{
		{
			name: "Consumer has been assigned partitions not available to it",
			args: args{
				memberID: "c1",
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
						{Topic: "t1", Partition: 2},
					},
					"c2": {},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
					},
					"c2": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
						{Topic: "t1", Partition: 2},
					},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1", "c2"},
					{Topic: "t1", Partition: 1}: {"c1", "c2"},
					{Topic: "t1", Partition: 2}: {"c2"},
				},
			},
			want: true,
		},
		{
			name: "Consumer has been assigned all available partitions",
			args: args{
				memberID: "c1",
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
					},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
					},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1"},
					{Topic: "t1", Partition: 1}: {"c1"},
				},
			},
			want: false,
		},
		{
			name: "Consumer has not been assigned all available partitions",
			args: args{
				memberID: "c1",
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
					},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
						{Topic: "t1", Partition: 1},
						{Topic: "t1", Partition: 2},
					},
				},
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{Topic: "t1", Partition: 0}: {"c1"},
					{Topic: "t1", Partition: 1}: {"c1"},
					{Topic: "t1", Partition: 2}: {"c1"},
				},
			},
			want: true,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want {
				t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) {
	type args struct {
		assignments []topicPartitionAssignment
		topic       topicPartitionAssignment
	}
	tests := []struct {
		name string
		args args
		want []topicPartitionAssignment
	}{
		{
			name: "Empty",
			args: args{
				assignments: make([]topicPartitionAssignment, 0),
				topic:       topicPartitionAssignment{Topic: "t1", Partition: 0},
			},
			want: make([]topicPartitionAssignment, 0),
		},
		{
			name: "Remove first entry",
			args: args{
				assignments: []topicPartitionAssignment{
					{Topic: "t1", Partition: 0},
					{Topic: "t1", Partition: 1},
					{Topic: "t1", Partition: 2},
				},
				topic: topicPartitionAssignment{Topic: "t1", Partition: 0},
			},
			want: []topicPartitionAssignment{
				{Topic: "t1", Partition: 1},
				{Topic: "t1", Partition: 2},
			},
		},
		{
			name: "Remove middle entry",
			args: args{
				assignments: []topicPartitionAssignment{
					{Topic: "t1", Partition: 0},
					{Topic: "t1", Partition: 1},
					{Topic: "t1", Partition: 2},
				},
				topic: topicPartitionAssignment{Topic: "t1", Partition: 1},
			},
			want: []topicPartitionAssignment{
				{Topic: "t1", Partition: 0},
				{Topic: "t1", Partition: 2},
			},
		},
		{
			name: "Remove last entry",
			args: args{
				assignments: []topicPartitionAssignment{
					{Topic: "t1", Partition: 0},
					{Topic: "t1", Partition: 1},
					{Topic: "t1", Partition: 2},
				},
				topic: topicPartitionAssignment{Topic: "t1", Partition: 2},
			},
			want: []topicPartitionAssignment{
				{Topic: "t1", Partition: 0},
				{Topic: "t1", Partition: 1},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) {
				t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want)
			}
		})
	}
}

func Test_assignPartition(t *testing.T) {
	type args struct {
		partition                       topicPartitionAssignment
		sortedCurrentSubscriptions      []string
		currentAssignment               map[string][]topicPartitionAssignment
		consumer2AllPotentialPartitions map[string][]topicPartitionAssignment
		currentPartitionConsumer        map[topicPartitionAssignment]string
	}
	tests := []struct {
		name                         string
		args                         args
		want                         []string
		wantCurrentAssignment        map[string][]topicPartitionAssignment
		wantCurrentPartitionConsumer map[topicPartitionAssignment]string
	}{
		{
			name: "Base",
			args: args{
				partition:                  topicPartitionAssignment{Topic: "t1", Partition: 2},
				sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
					},
					"c2": {
						{Topic: "t1", Partition: 1},
					},
					"c3": {},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
					},
					"c2": {
						{Topic: "t1", Partition: 1},
					},
					"c3": {
						{Topic: "t1", Partition: 2},
					},
				},
				currentPartitionConsumer: map[topicPartitionAssignment]string{
					{Topic: "t1", Partition: 0}: "c1",
					{Topic: "t1", Partition: 1}: "c2",
				},
			},
			want: []string{"c1", "c2", "c3"},
			wantCurrentAssignment: map[string][]topicPartitionAssignment{
				"c1": {
					{Topic: "t1", Partition: 0},
				},
				"c2": {
					{Topic: "t1", Partition: 1},
				},
				"c3": {
					{Topic: "t1", Partition: 2},
				},
			},
			wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
				{Topic: "t1", Partition: 0}: "c1",
				{Topic: "t1", Partition: 1}: "c2",
				{Topic: "t1", Partition: 2}: "c3",
			},
		},
		{
			name: "Unassignable Partition",
			args: args{
				partition:                  topicPartitionAssignment{Topic: "t1", Partition: 3},
				sortedCurrentSubscriptions: []string{"c3", "c1", "c2"},
				currentAssignment: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
					},
					"c2": {
						{Topic: "t1", Partition: 1},
					},
					"c3": {},
				},
				consumer2AllPotentialPartitions: map[string][]topicPartitionAssignment{
					"c1": {
						{Topic: "t1", Partition: 0},
					},
					"c2": {
						{Topic: "t1", Partition: 1},
					},
					"c3": {
						{Topic: "t1", Partition: 2},
					},
				},
				currentPartitionConsumer: map[topicPartitionAssignment]string{
					{Topic: "t1", Partition: 0}: "c1",
					{Topic: "t1", Partition: 1}: "c2",
				},
			},
			want: []string{"c3", "c1", "c2"},
			wantCurrentAssignment: map[string][]topicPartitionAssignment{
				"c1": {
					{Topic: "t1", Partition: 0},
				},
				"c2": {
					{Topic: "t1", Partition: 1},
				},
				"c3": {},
			},
			wantCurrentPartitionConsumer: map[topicPartitionAssignment]string{
				{Topic: "t1", Partition: 0}: "c1",
				{Topic: "t1", Partition: 1}: "c2",
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) {
				t.Errorf("assignPartition() = %v, want %v", got, tt.want)
			}
			if !reflect.DeepEqual(tt.args.currentAssignment, tt.wantCurrentAssignment) {
				t.Errorf("assignPartition() currentAssignment = %v, want %v", tt.args.currentAssignment, tt.wantCurrentAssignment)
			}
			if !reflect.DeepEqual(tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer) {
				t.Errorf("assignPartition() currentPartitionConsumer = %v, want %v", tt.args.currentPartitionConsumer, tt.wantCurrentPartitionConsumer)
			}
		})
	}
}

func Test_stickyBalanceStrategy_Plan(t *testing.T) {
	type args struct {
		members map[string]ConsumerGroupMemberMetadata
		topics  map[string][]int32
	}
	tests := []struct {
		name string
		s    *stickyBalanceStrategy
		args args
	}{
		{
			name: "One consumer with no topics",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer": {},
				},
				topics: make(map[string][]int32),
			},
		},
		{
			name: "One consumer with non-existent topic",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer": {
						Topics: []string{"topic"},
					},
				},
				topics: map[string][]int32{
					"topic": make([]int32, 0),
				},
			},
		},
		{
			name: "One consumer with one topic",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer": {
						Topics: []string{"topic"},
					},
				},
				topics: map[string][]int32{
					"topic": {0, 1, 2},
				},
			},
		},
		{
			name: "Only assigns partitions from subscribed topics",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer": {
						Topics: []string{"topic"},
					},
				},
				topics: map[string][]int32{
					"topic": {0, 1, 2},
					"other": {0, 1, 2},
				},
			},
		},
		{
			name: "One consumer with multiple topics",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer": {
						Topics: []string{"topic1", "topic2"},
					},
				},
				topics: map[string][]int32{
					"topic1": {0},
					"topic2": {0, 1},
				},
			},
		},
		{
			name: "Two consumers with one topic and one partition",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics: []string{"topic"},
					},
					"consumer2": {
						Topics: []string{"topic"},
					},
				},
				topics: map[string][]int32{
					"topic": {0},
				},
			},
		},
		{
			name: "Two consumers with one topic and two partitions",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics: []string{"topic"},
					},
					"consumer2": {
						Topics: []string{"topic"},
					},
				},
				topics: map[string][]int32{
					"topic": {0, 1},
				},
			},
		},
		{
			name: "Multiple consumers with mixed topic subscriptions",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics: []string{"topic1"},
					},
					"consumer2": {
						Topics: []string{"topic1", "topic2"},
					},
					"consumer3": {
						Topics: []string{"topic1"},
					},
				},
				topics: map[string][]int32{
					"topic1": {0, 1, 2},
					"topic2": {0, 1},
				},
			},
		},
		{
			name: "Two consumers with two topics and six partitions",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics: []string{"topic1", "topic2"},
					},
					"consumer2": {
						Topics: []string{"topic1", "topic2"},
					},
				},
				topics: map[string][]int32{
					"topic1": {0, 1, 2},
					"topic2": {0, 1, 2},
				},
			},
		},
		{
			name: "Three consumers (two old, one new) with one topic and twelve partitions",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics:   []string{"topic1"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2}}, 1),
					},
					"consumer2": {
						Topics:   []string{"topic1"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 6}}, 1),
					},
					"consumer3": {
						Topics: []string{"topic1"},
					},
				},
				topics: map[string][]int32{
					"topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
				},
			},
		},
		{
			name: "Three consumers (two old, one new) with one topic and 13 partitions",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics:   []string{"topic1"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {4, 11, 8, 5, 9, 2, 6}}, 1),
					},
					"consumer2": {
						Topics:   []string{"topic1"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1, 3, 0, 7, 10, 12}}, 1),
					},
					"consumer3": {
						Topics: []string{"topic1"},
					},
				},
				topics: map[string][]int32{
					"topic1": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
				},
			},
		},
		{
			name: "One consumer that is no longer subscribed to a topic that it had previously been consuming from",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics:   []string{"topic2"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
					},
				},
				topics: map[string][]int32{
					"topic1": {0},
					"topic2": {0},
				},
			},
		},
		{
			name: "Two consumers where one is no longer interested in consuming from a topic that it had been consuming from",
			args: args{
				members: map[string]ConsumerGroupMemberMetadata{
					"consumer1": {
						Topics:   []string{"topic2"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0}}, 1),
					},
					"consumer2": {
						Topics:   []string{"topic1", "topic2"},
						UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {1}}, 1),
					},
				},
				topics: map[string][]int32{
					"topic1": {0, 1},
					"topic2": {0, 1},
				},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			s := &stickyBalanceStrategy{}
			plan, err := s.Plan(tt.args.members, tt.args.topics)
			verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err)
			verifyFullyBalanced(t, plan)
		})
	}
}

func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
		},
		"consumer2": {
			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
		},
		"consumer3": {
			Topics: []string{"topic1", "topic2", "topic3", "topic4"},
		},
	}
	topics := map[string][]int32{
		"topic1": {0, 1},
		"topic2": {0, 1},
		"topic3": {0, 1},
		"topic4": {0, 1},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
	verifyFullyBalanced(t, plan1)

	// PLAN 2
	delete(members, "consumer1")
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
	}
	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)
}

func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: []string{"topic1"},
		},
		"consumer2": {
			Topics: []string{"topic1", "topic2"},
		},
		"consumer3": {
			Topics: []string{"topic1", "topic2", "topic3"},
		},
	}
	topics := map[string][]int32{
		"topic1": {0},
		"topic2": {0, 1},
		"topic3": {0, 1, 2},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
	if len(plan1["consumer1"]["topic1"]) != 1 || len(plan1["consumer2"]["topic2"]) != 2 || len(plan1["consumer3"]["topic3"]) != 3 {
		t.Error("Incorrect distribution of topic partition assignments")
	}

	// PLAN 2
	delete(members, "consumer1")
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   members["consumer2"].Topics,
		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   members["consumer3"].Topics,
		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
	}
	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)
	if len(plan2["consumer2"]["topic1"]) != 1 || len(plan2["consumer2"]["topic2"]) != 2 || len(plan2["consumer3"]["topic3"]) != 3 {
		t.Error("Incorrect distribution of topic partition assignments")
	}
}

func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) {
	s := &stickyBalanceStrategy{}
	topicNames := []string{"topic1", "topic2"}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: topicNames,
		},
		"consumer2": {
			Topics: topicNames,
		},
	}
	topics := map[string][]int32{
		"topic1": {0, 1},
		"topic2": {0, 1},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// PLAN 2
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics: topicNames,
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   topicNames,
		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   topicNames,
		UserData: encodeSubscriberPlan(t, plan1["consumer3"]),
	}
	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)
}

func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: []string{"topic"},
		},
	}
	topics := map[string][]int32{
		"topic": {0, 1, 2},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// PLAN 2
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic"},
		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic"},
	}
	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)

	// PLAN 3
	delete(members, "consumer1")
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic"},
		UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
	}
	plan3, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
}

func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
		},
		"consumer2": {
			Topics: []string{"topic1", "topic3", "topic5"},
		},
		"consumer3": {
			Topics: []string{"topic1", "topic3", "topic5"},
		},
		"consumer4": {
			Topics: []string{"topic1", "topic2", "topic3", "topic4", "topic5"},
		},
	}
	topics := make(map[string][]int32, 5)
	for i := 1; i <= 5; i++ {
		partitions := make([]int32, i%2+1)
		for j := 0; j < i%2+1; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
}

func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {
			Topics: []string{"topic1"},
		},
		"consumer2": {
			Topics: []string{"topic1"},
		},
	}
	topics := map[string][]int32{
		"topic1": {0, 1, 2},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
	verifyFullyBalanced(t, plan1)

	// PLAN 2
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2"},
		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2"},
		UserData: encodeSubscriberPlan(t, plan1["consumer2"]),
	}
	topics["topic2"] = []int32{0, 1, 2}

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)

	// PLAN 3
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2"},
		UserData: encodeSubscriberPlan(t, plan2["consumer1"]),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2"},
		UserData: encodeSubscriberPlan(t, plan2["consumer2"]),
	}
	delete(topics, "topic1")

	plan3, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
	verifyFullyBalanced(t, plan3)
}

func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 20; i++ {
		topics := make([]string, 20)
		for j := 0; j < 20; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 20)
	for i := 0; i < 20; i++ {
		partitions := make([]int32, 20)
		for j := 0; j < 20; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}

	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	for i := 0; i < 20; i++ {
		topics := make([]string, 20)
		for j := 0; j < 20; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
		}
	}
	delete(members, "consumer10")

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := make(map[string]ConsumerGroupMemberMetadata)
	for i := 0; i < 10; i++ {
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}
	}
	partitions := make([]int32, 20)
	for j := 0; j < 20; j++ {
		partitions[j] = int32(j)
	}
	topics := map[string][]int32{"topic1": partitions}

	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// add a new consumer
	members["consumer10"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) {
	s := &stickyBalanceStrategy{}

	// PLAN 1
	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 9; i++ {
		topics := make([]string, 15)
		for j := 0; j < 15; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 15)
	for i := 0; i < 15; i++ {
		partitions := make([]int32, i)
		for j := 0; j < i; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}

	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// PLAN 2
	for i := 0; i < 9; i++ {
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
		}
	}
	delete(members, "consumer5")

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) {
	s := &stickyBalanceStrategy{}
	r := rand.New(rand.NewSource(time.Now().UnixNano()))

	// PLAN 1
	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 200; i++ {
		topics := make([]string, 200)
		for j := 0; j < 200; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 40)
	for i := 0; i < 40; i++ {
		partitionCount := r.Intn(20)
		partitions := make([]int32, partitionCount)
		for j := 0; j < partitionCount; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}

	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	for i := 0; i < 200; i++ {
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
		}
	}
	for i := 0; i < 50; i++ {
		delete(members, fmt.Sprintf("consumer%d", i))
	}

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) {
	s := &stickyBalanceStrategy{}

	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 3; i++ {
		topics := make([]string, 0)
		for j := i; j <= 3*i-2; j++ {
			topics = append(topics, fmt.Sprintf("topic%d", j))
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 5)
	for i := 1; i < 5; i++ {
		topics[fmt.Sprintf("topic%d", i)] = []int32{0}
	}

	plan1, err := s.Plan(members, topics)
	if err != nil {
		t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
		return
	}
	verifyValidityAndBalance(t, members, plan1)

	members["consumer0"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))

	minNumConsumers := 20
	maxNumConsumers := 40
	minNumTopics := 10
	maxNumTopics := 20

	for round := 0; round < 100; round++ {
		numTopics := minNumTopics + r.Intn(maxNumTopics-minNumTopics)
		topics := make([]string, numTopics)
		partitionsPerTopic := make(map[string][]int32, numTopics)
		for i := 0; i < numTopics; i++ {
			topicName := fmt.Sprintf("topic%d", i)
			topics[i] = topicName
			partitions := make([]int32, maxNumTopics)
			for j := 0; j < maxNumTopics; j++ {
				partitions[j] = int32(j)
			}
			partitionsPerTopic[topicName] = partitions
		}

		numConsumers := minNumConsumers + r.Intn(maxNumConsumers-minNumConsumers)
		members := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
		for i := 0; i < numConsumers; i++ {
			sub := getRandomSublist(r, topics)
			sort.Strings(sub)
			members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: sub}
		}

		s := &stickyBalanceStrategy{}
		plan, err := s.Plan(members, partitionsPerTopic)
		verifyPlanIsBalancedAndSticky(t, s, members, plan, err)

		// PLAN 2
		membersPlan2 := make(map[string]ConsumerGroupMemberMetadata, numConsumers)
		for i := 0; i < numConsumers; i++ {
			sub := getRandomSublist(r, topics)
			sort.Strings(sub)
			membersPlan2[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
				Topics:   sub,
				UserData: encodeSubscriberPlan(t, plan[fmt.Sprintf("consumer%d", i)]),
			}
		}
		plan2, err := s.Plan(membersPlan2, partitionsPerTopic)
		verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	}
}

func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := make(map[string][]int32, 6)
	for i := 1; i <= 6; i++ {
		topics[fmt.Sprintf("topic%d", i)] = []int32{0}
	}
	members := make(map[string]ConsumerGroupMemberMetadata, 3)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2"},
		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic1": {0}}),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1", "topic2", "topic3", "topic4"},
		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic2": {0}, "topic3": {0}}),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic2", "topic3", "topic4", "topic5", "topic6"},
		UserData: encodeSubscriberPlan(t, map[string][]int32{"topic4": {0}, "topic5": {0}, "topic6": {0}}),
	}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
}

func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2}}
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {Topics: []string{"topic1"}},
		"consumer2": {Topics: []string{"topic1"}},
		"consumer3": {Topics: []string{"topic1"}},
		"consumer4": {Topics: []string{"topic1"}},
	}

	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// PLAN 2
	// remove the potential group leader
	delete(members, "consumer1")
	for i := 2; i <= 4; i++ {
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
			Topics:   []string{"topic1"},
			UserData: encodeSubscriberPlan(t, plan1[fmt.Sprintf("consumer%d", i)]),
		}
	}

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := make(map[string][]int32, 2)
	topics["topic1"] = []int32{0}
	topics["topic3"] = make([]int32, 100)
	for i := 0; i < 100; i++ {
		topics["topic3"][i] = int32(i)
	}
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {Topics: []string{"topic1", "topic2", "topic3"}},
	}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
	verifyFullyBalanced(t, plan)
	if (len(plan["consumer1"]["topic1"]) + len(plan["consumer1"]["topic3"])) != 101 {
		t.Error("Incorrect number of partitions assigned")
		return
	}
}

func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2}}
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {Topics: []string{"topic1"}},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)

	// PLAN 2
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   members["consumer1"].Topics,
		UserData: encodeSubscriberPlan(t, plan1["consumer1"]),
	}

	plan2, err := s.Plan(members, map[string][]int32{})
	if len(plan2) != 1 {
		t.Error("Incorrect number of consumers")
		return
	}
	if len(plan2["consumer1"]) != 0 {
		t.Error("Incorrect number of consumer topic assignments")
		return
	}
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
}

func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {Topics: []string{"topic1"}},
		"consumer2": {Topics: []string{"topic1"}},
		"consumer3": {Topics: []string{"topic1"}},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
	verifyFullyBalanced(t, plan1)

	// PLAN 2
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
	}
	delete(members, "consumer3")

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)
	if len(intersection(plan1["consumer1"]["topic1"], plan2["consumer1"]["topic1"])) != 2 {
		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
	}
	if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
	}

	// PLAN 3
	delete(members, "consumer1")
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
	}

	plan3, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
	verifyFullyBalanced(t, plan3)
}

func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
	members := map[string]ConsumerGroupMemberMetadata{
		"consumer1": {Topics: []string{"topic1"}},
		"consumer2": {Topics: []string{"topic1"}},
		"consumer3": {Topics: []string{"topic1"}},
	}
	plan1, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan1, err)
	verifyFullyBalanced(t, plan1)

	// PLAN 2
	delete(members, "consumer1")
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer2"], 1),
	}
	delete(members, "consumer3")

	plan2, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan2, err)
	verifyFullyBalanced(t, plan2)
	if len(intersection(plan1["consumer2"]["topic1"], plan2["consumer2"]["topic1"])) != 2 {
		t.Error("stickyBalanceStrategy.Plan() consumer1 didn't maintain partitions across reassignment")
	}

	// PLAN 3
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer1"], 1),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan2["consumer2"], 2),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, plan1["consumer3"], 1),
	}
	plan3, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan3, err)
	verifyFullyBalanced(t, plan3)
}
func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}}
	members := make(map[string]ConsumerGroupMemberMetadata, 3)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1, 4}}, 1),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2, 3}}, 1),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {3, 4, 5}}, 2),
	}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
	verifyFullyBalanced(t, plan)
}

func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1, 2}}
	members := make(map[string]ConsumerGroupMemberMetadata, 3)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 2}}, 1),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithOldSchema(t, map[string][]int32{"topic1": {1}}),
	}
	members["consumer3"] = ConsumerGroupMemberMetadata{Topics: []string{"topic1"}}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
	verifyFullyBalanced(t, plan)
}

func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) {
	s := &stickyBalanceStrategy{}

	topics := map[string][]int32{"topic1": {0, 1}}
	members := make(map[string]ConsumerGroupMemberMetadata, 2)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics:   []string{"topic1"},
		UserData: encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1),
	}

	plan, err := s.Plan(members, topics)
	verifyPlanIsBalancedAndSticky(t, s, members, plan, err)
	verifyFullyBalanced(t, plan)
}

func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) {
	s := &stickyBalanceStrategy{}

	members := make(map[string]ConsumerGroupMemberMetadata, 2)
	members["consumer1"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}
	members["consumer2"] = ConsumerGroupMemberMetadata{
		Topics: []string{"topic1"},
	}

	expected := encodeSubscriberPlanWithGeneration(t, map[string][]int32{"topic1": {0, 1}}, 1)

	actual, err := s.AssignmentData("consumer1", map[string][]int32{"topic1": {0, 1}}, 1)
	if err != nil {
		t.Errorf("Error building assignment data: %v", err)
	}
	if !bytes.Equal(expected, actual) {
		t.Error("Invalid assignment data returned from AssignmentData")
	}
}

func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
	s := &stickyBalanceStrategy{}
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 200; i++ {
		topics := make([]string, 200)
		for j := 0; j < 200; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 40)
	for i := 0; i < 40; i++ {
		partitionCount := r.Intn(20)
		partitions := make([]int32, partitionCount)
		for j := 0; j < partitionCount; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}
	b.ResetTimer()

	for n := 0; n < b.N; n++ {
		if _, err := s.Plan(members, topics); err != nil {
			b.Errorf("Error building plan in benchmark: %v", err)
		}
	}
}

func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopicsAndExistingAssignments(b *testing.B) {
	s := &stickyBalanceStrategy{}
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	members := make(map[string]ConsumerGroupMemberMetadata, 20)
	for i := 0; i < 200; i++ {
		topics := make([]string, 200)
		for j := 0; j < 200; j++ {
			topics[j] = fmt.Sprintf("topic%d", j)
		}
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{Topics: topics}
	}
	topics := make(map[string][]int32, 40)
	for i := 0; i < 40; i++ {
		partitionCount := r.Intn(20)
		partitions := make([]int32, partitionCount)
		for j := 0; j < partitionCount; j++ {
			partitions[j] = int32(j)
		}
		topics[fmt.Sprintf("topic%d", i)] = partitions
	}
	plan, _ := s.Plan(members, topics)

	for i := 0; i < 200; i++ {
		members[fmt.Sprintf("consumer%d", i)] = ConsumerGroupMemberMetadata{
			Topics:   members[fmt.Sprintf("consumer%d", i)].Topics,
			UserData: encodeSubscriberPlanWithGenerationForBenchmark(b, plan[fmt.Sprintf("consumer%d", i)], 1),
		}
	}
	for i := 0; i < 1; i++ {
		delete(members, fmt.Sprintf("consumer%d", i))
	}
	b.ResetTimer()

	for n := 0; n < b.N; n++ {
		if _, err := s.Plan(members, topics); err != nil {
			b.Errorf("Error building plan in benchmark: %v", err)
		}
	}
}

func verifyPlanIsBalancedAndSticky(t *testing.T, s *stickyBalanceStrategy, members map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan, err error) {
	if err != nil {
		t.Errorf("stickyBalanceStrategy.Plan() error = %v", err)
		return
	}
	if !s.movements.isSticky() {
		t.Error("stickyBalanceStrategy.Plan() not sticky")
		return
	}
	verifyValidityAndBalance(t, members, plan)
}

func verifyValidityAndBalance(t *testing.T, consumers map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan) {
	size := len(consumers)
	if size != len(plan) {
		t.Errorf("Subscription size (%d) not equal to plan size (%d)", size, len(plan))
		t.FailNow()
	}

	members := make([]string, size)
	i := 0
	for memberID := range consumers {
		members[i] = memberID
		i++
	}
	sort.Strings(members)

	for i, memberID := range members {
		for assignedTopic := range plan[memberID] {
			found := false
			for _, assignableTopic := range consumers[memberID].Topics {
				if assignableTopic == assignableTopic {
					found = true
					break
				}
			}
			if !found {
				t.Errorf("Consumer %s had assigned topic %s that wasn't in the list of assignable topics", memberID, assignedTopic)
				t.FailNow()
			}
		}

		// skip last consumer
		if i == len(members)-1 {
			continue
		}

		consumerAssignments := make([]topicPartitionAssignment, 0)
		for topic, partitions := range plan[memberID] {
			for _, partition := range partitions {
				consumerAssignments = append(consumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
			}
		}

		for j := i + 1; j < size; j++ {
			otherConsumer := members[j]
			otherConsumerAssignments := make([]topicPartitionAssignment, 0)
			for topic, partitions := range plan[otherConsumer] {
				for _, partition := range partitions {
					otherConsumerAssignments = append(otherConsumerAssignments, topicPartitionAssignment{Topic: topic, Partition: partition})
				}
			}
			assignmentsIntersection := intersection(consumerAssignments, otherConsumerAssignments)
			if len(assignmentsIntersection) > 0 {
				t.Errorf("Consumers %s and %s have common partitions assigned to them: %v", memberID, otherConsumer, assignmentsIntersection)
				t.FailNow()
			}

			if math.Abs(float64(len(consumerAssignments)-len(otherConsumerAssignments))) <= 1 {
				continue
			}

			if len(consumerAssignments) > len(otherConsumerAssignments) {
				for _, topic := range consumerAssignments {
					if _, exists := plan[otherConsumer][topic.Topic]; exists {
						t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", otherConsumer, memberID, memberID, len(consumerAssignments), otherConsumer, len(otherConsumerAssignments))
						t.FailNow()
					}
				}
			}

			if len(otherConsumerAssignments) > len(consumerAssignments) {
				for _, topic := range otherConsumerAssignments {
					if _, exists := plan[memberID][topic.Topic]; exists {
						t.Errorf("Some partitions can be moved from %s to %s to achieve a better balance, %s has %d assignments, and %s has %d assignments", memberID, otherConsumer, otherConsumer, len(otherConsumerAssignments), memberID, len(consumerAssignments))
						t.FailNow()
					}
				}
			}
		}
	}
}

// Produces the intersection of two slices
// From https://github.com/juliangruber/go-intersect
func intersection(a interface{}, b interface{}) []interface{} {
	set := make([]interface{}, 0)
	hash := make(map[interface{}]bool)
	av := reflect.ValueOf(a)
	bv := reflect.ValueOf(b)

	for i := 0; i < av.Len(); i++ {
		el := av.Index(i).Interface()
		hash[el] = true
	}

	for i := 0; i < bv.Len(); i++ {
		el := bv.Index(i).Interface()
		if _, found := hash[el]; found {
			set = append(set, el)
		}
	}

	return set
}

func encodeSubscriberPlan(t *testing.T, assignments map[string][]int32) []byte {
	return encodeSubscriberPlanWithGeneration(t, assignments, defaultGeneration)
}

func encodeSubscriberPlanWithGeneration(t *testing.T, assignments map[string][]int32, generation int32) []byte {
	userDataBytes, err := encode(&StickyAssignorUserDataV1{
		Topics:     assignments,
		Generation: generation,
	}, nil)
	if err != nil {
		t.Errorf("encodeSubscriberPlan error = %v", err)
		t.FailNow()
	}
	return userDataBytes
}

func encodeSubscriberPlanWithGenerationForBenchmark(b *testing.B, assignments map[string][]int32, generation int32) []byte {
	userDataBytes, err := encode(&StickyAssignorUserDataV1{
		Topics:     assignments,
		Generation: generation,
	}, nil)
	if err != nil {
		b.Errorf("encodeSubscriberPlan error = %v", err)
		b.FailNow()
	}
	return userDataBytes
}

func encodeSubscriberPlanWithOldSchema(t *testing.T, assignments map[string][]int32) []byte {
	userDataBytes, err := encode(&StickyAssignorUserDataV0{
		Topics: assignments,
	}, nil)
	if err != nil {
		t.Errorf("encodeSubscriberPlan error = %v", err)
		t.FailNow()
	}
	return userDataBytes
}

// verify that the plan is fully balanced, assumes that all consumers can
// consume from the same set of topics
func verifyFullyBalanced(t *testing.T, plan BalanceStrategyPlan) {
	min := math.MaxInt32
	max := math.MinInt32
	for _, topics := range plan {
		assignedPartitionsCount := 0
		for _, partitions := range topics {
			assignedPartitionsCount += len(partitions)
		}
		if assignedPartitionsCount < min {
			min = assignedPartitionsCount
		}
		if assignedPartitionsCount > max {
			max = assignedPartitionsCount
		}
	}
	if (max - min) > 1 {
		t.Errorf("Plan partition assignment is not fully balanced: min=%d, max=%d", min, max)
	}
}

func getRandomSublist(r *rand.Rand, s []string) []string {
	howManyToRemove := r.Intn(len(s))
	allEntriesMap := make(map[int]string)
	for i, s := range s {
		allEntriesMap[i] = s
	}
	for i := 0; i < howManyToRemove; i++ {
		delete(allEntriesMap, r.Intn(len(allEntriesMap)))
	}

	subList := make([]string, len(allEntriesMap))
	i := 0
	for _, s := range allEntriesMap {
		subList[i] = s
		i++
	}
	return subList
}

func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) {
	type args struct {
		partition2AllPotentialConsumers map[topicPartitionAssignment][]string
	}
	tests := []struct {
		name string
		args args
		want []topicPartitionAssignment
	}{
		{
			name: "Single topic partition",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{
						Topic:     "t1",
						Partition: 0,
					}: {"c1", "c2"},
				},
			},
			want: []topicPartitionAssignment{
				{
					Topic:     "t1",
					Partition: 0,
				},
			},
		},
		{
			name: "Multiple topic partitions with the same number of consumers but different topic names",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{
						Topic:     "t1",
						Partition: 0,
					}: {"c1", "c2"},
					{
						Topic:     "t2",
						Partition: 0,
					}: {"c1", "c2"},
				},
			},
			want: []topicPartitionAssignment{
				{
					Topic:     "t1",
					Partition: 0,
				},
				{
					Topic:     "t2",
					Partition: 0,
				},
			},
		},
		{
			name: "Multiple topic partitions with the same number of consumers and topic names",
			args: args{
				partition2AllPotentialConsumers: map[topicPartitionAssignment][]string{
					{
						Topic:     "t1",
						Partition: 0,
					}: {"c1", "c2"},
					{
						Topic:     "t1",
						Partition: 1,
					}: {"c1", "c2"},
				},
			},
			want: []topicPartitionAssignment{
				{
					Topic:     "t1",
					Partition: 0,
				},
				{
					Topic:     "t1",
					Partition: 1,
				},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) {
				t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want)
			}
		})
	}
}