package sarama import ( "errors" "strings" "testing" ) func TestClusterAdmin(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminInvalidController(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) config := NewConfig() config.Version = V1_0_0_0 _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err == nil { t.Fatal(errors.New("controller not set still cluster admin was created")) } if err != ErrControllerNotAvailable { t.Fatal(err) } } func TestClusterAdminCreateTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreateTopic("my_topic", nil, false) if err.Error() != "you must specify topic details" { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreateTopicsRequest": NewMockCreateTopicsResponse(t), }) config := NewConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) want := "insufficient permissions to create topic with reserved prefix" if !strings.HasSuffix(err.Error(), want) { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminListTopics(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetLeader("my_topic", 0, seedBroker.BrokerID()), "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) config := NewConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } entries, err := admin.ListTopics() if err != nil { t.Fatal(err) } if len(entries) <= 0 { t.Fatal(errors.New("no resource present")) } topic, found := entries["my_topic"] if !found { t.Fatal(errors.New("topic not found in response")) } _, found = topic.ConfigEntries["max.message.bytes"] if found { t.Fatal(errors.New("default topic config entry incorrectly found in response")) } value := topic.ConfigEntries["retention.ms"] if value == nil || *value != "5000" { t.Fatal(errors.New("non-default topic config entry not found in response")) } err = admin.Close() if err != nil { t.Fatal(err) } if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 { t.Fatal(errors.New("replica assignment not found in response")) } } func TestClusterAdminDeleteTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.DeleteTopic("my_topic") if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDeleteEmptyTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.DeleteTopic("") if err != ErrInvalidTopic { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreatePartitions(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreatePartitions("my_topic", 3, nil, false) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreatePartitions("my_topic", 3, nil, false) if err != ErrUnsupportedVersion { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.CreatePartitions("_internal_topic", 3, nil, false) want := "insufficient permissions to create partition on topic with reserved prefix" if !strings.HasSuffix(err.Error(), want) { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminAlterPartitionReassignments(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() secondBroker := NewMockBroker(t, 2) defer secondBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(secondBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) config := NewConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } var topicAssignment = make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() secondBroker := NewMockBroker(t, 2) defer secondBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(secondBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), }) config := NewConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } var topicAssignment = make([][]int32, 0, 3) err = admin.AlterPartitionReassignments("my_topic", topicAssignment) if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminListPartitionReassignments(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() secondBroker := NewMockBroker(t, 2) defer secondBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(secondBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) config := NewConfig() config.Version = V2_4_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1}) if err != nil { t.Fatal(err) } partitionStatus, ok := response["my_topic"] if !ok { t.Fatalf("topic missing in response") } else { if len(partitionStatus) != 2 { t.Fatalf("partition missing in response") } } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() secondBroker := NewMockBroker(t, 2) defer secondBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(secondBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), }) config := NewConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } var partitions = make([]int32, 0) _, err = admin.ListPartitionReassignments("my_topic", partitions) if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDeleteRecords(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetLeader(topicName, 1, 1). SetLeader(topicName, 2, 1). SetLeader(topicName, 3, 1), "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } partitionOffsetFake := make(map[int32]int64) partitionOffsetFake[4] = 1000 errFake := admin.DeleteRecords(topicName, partitionOffsetFake) if errFake == nil { t.Fatal(err) } partitionOffset := make(map[int32]int64) partitionOffset[1] = 1000 partitionOffset[2] = 1000 partitionOffset[3] = 1000 err = admin.DeleteRecords(topicName, partitionOffset) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) secondBroker := NewMockBroker(t, 2) defer seedBroker.Close() defer secondBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.brokerID). SetLeader(topicName, 1, 1). SetLeader(topicName, 2, 1). SetLeader(topicName, 3, 2), "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.brokerID). SetLeader(topicName, 1, 1). SetLeader(topicName, 2, 1). SetLeader(topicName, 3, 2), "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } partitionOffset := make(map[int32]int64) partitionOffset[1] = 1000 partitionOffset[2] = 1000 partitionOffset[3] = 1000 err = admin.DeleteRecords(topicName, partitionOffset) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetLeader(topicName, 1, 1). SetLeader(topicName, 2, 1). SetLeader(topicName, 3, 1), "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), }) config := NewConfig() config.Version = V0_10_2_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } partitionOffset := make(map[int32]int64) partitionOffset[1] = 1000 partitionOffset[2] = 1000 partitionOffset[3] = 1000 err = admin.DeleteRecords(topicName, partitionOffset) if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") { t.Fatal(err) } deleteRecordsError, ok := err.(ErrDeleteRecords) if !ok { t.Fatal(err) } for _, err := range *deleteRecordsError.Errors { if err != ErrUnsupportedVersion { t.Fatal(err) } } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDescribeConfig(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) var tests = []struct { saramaVersion KafkaVersion requestVersion int16 includeSynonyms bool }{ {V1_0_0_0, 0, false}, {V1_1_0_0, 1, true}, {V1_1_1_0, 1, true}, {V2_0_0_0, 2, true}, } for _, tt := range tests { config := NewConfig() config.Version = tt.saramaVersion admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } defer func() { _ = admin.Close() }() resource := ConfigResource{ Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}, } entries, err := admin.DescribeConfig(resource) if err != nil { t.Fatal(err) } history := seedBroker.History() describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest) if !ok { t.Fatal("failed to find DescribeConfigsRequest in mockBroker history") } if describeReq.Version != tt.requestVersion { t.Fatalf( "requestVersion %v did not match expected %v", describeReq.Version, tt.requestVersion) } if len(entries) <= 0 { t.Fatal(errors.New("no resource present")) } if tt.includeSynonyms { if len(entries[0].Synonyms) == 0 { t.Fatal("expected synonyms to have been included") } } } } func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t), }) config := NewConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } defer func() { _ = admin.Close() }() resource := ConfigResource{ Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}, } _, err = admin.DescribeConfig(resource) if err == nil { t.Fatal(errors.New("ErrorCode present but no Error returned")) } } // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config // is sent to the broker in the resource struct, _not_ the controller func TestClusterAdminDescribeBrokerConfig(t *testing.T) { controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) defer configBroker.Close() controllerBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(controllerBroker.BrokerID()). SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). SetBroker(configBroker.Addr(), configBroker.BrokerID()), }) configBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(controllerBroker.BrokerID()). SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). SetBroker(configBroker.Addr(), configBroker.BrokerID()), "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ controllerBroker.Addr(), configBroker.Addr(), }, config) if err != nil { t.Fatal(err) } for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { resource := ConfigResource{Name: "2", Type: resourceType} entries, err := admin.DescribeConfig(resource) if err != nil { t.Fatal(err) } if len(entries) <= 0 { t.Fatal(errors.New("no resource present")) } } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminAlterConfig(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } var value string entries := make(map[string]*string) value = "60000" entries["retention.ms"] = &value err = admin.AlterConfig(TopicResource, "my_topic", entries, false) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } defer func() { _ = admin.Close() }() var value string entries := make(map[string]*string) value = "60000" entries["retention.ms"] = &value err = admin.AlterConfig(TopicResource, "my_topic", entries, false) if err == nil { t.Fatal(errors.New("ErrorCode present but no Error returned")) } } func TestClusterAdminAlterBrokerConfig(t *testing.T) { controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) defer configBroker.Close() controllerBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(controllerBroker.BrokerID()). SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). SetBroker(configBroker.Addr(), configBroker.BrokerID()), }) configBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(controllerBroker.BrokerID()). SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). SetBroker(configBroker.Addr(), configBroker.BrokerID()), "AlterConfigsRequest": NewMockAlterConfigsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin( []string{ controllerBroker.Addr(), configBroker.Addr(), }, config) if err != nil { t.Fatal(err) } var value string entries := make(map[string]*string) value = "3" entries["min.insync.replicas"] = &value for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { resource := ConfigResource{Name: "2", Type: resourceType} err = admin.AlterConfig( resource.Type, resource.Name, entries, false) if err != nil { t.Fatal(err) } } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminCreateAcl(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "CreateAclsRequest": NewMockCreateAclsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} err = admin.CreateACL(r, a) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminListAcls(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DescribeAclsRequest": NewMockListAclsResponse(t), "CreateAclsRequest": NewMockCreateAclsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} err = admin.CreateACL(r, a) if err != nil { t.Fatal(err) } resourceName := "my_topic" filter := AclFilter{ ResourceType: AclResourceTopic, Operation: AclOperationRead, ResourceName: &resourceName, } rAcls, err := admin.ListAcls(filter) if err != nil { t.Fatal(err) } if len(rAcls) <= 0 { t.Fatal("no acls present") } err = admin.Close() if err != nil { t.Fatal(err) } } func TestClusterAdminDeleteAcl(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DeleteAclsRequest": NewMockDeleteAclsResponse(t), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } resourceName := "my_topic" filter := AclFilter{ ResourceType: AclResourceTopic, Operation: AclOperationAlter, ResourceName: &resourceName, } _, err = admin.DeleteACL(filter, false) if err != nil { t.Fatal(err) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestDescribeTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetLeader("my_topic", 0, seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } topics, err := admin.DescribeTopics([]string{"my_topic"}) if err != nil { t.Fatal(err) } if len(topics) != 1 { t.Fatalf("Expected 1 result, got %v", len(topics)) } if topics[0].Name != "my_topic" { t.Fatalf("Incorrect topic name: %v", topics[0].Name) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestDescribeTopicWithVersion0_11(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetLeader("my_topic", 0, seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), }) config := NewConfig() config.Version = V0_11_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } topics, err := admin.DescribeTopics([]string{"my_topic"}) if err != nil { t.Fatal(err) } if len(topics) != 1 { t.Fatalf("Expected 1 result, got %v", len(topics)) } if topics[0].Name != "my_topic" { t.Fatalf("Incorrect topic name: %v", topics[0].Name) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestDescribeConsumerGroup(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() expectedGroupID := "my-group" seedBroker.SetHandlerByMap(map[string]MockResponse{ "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{ GroupId: expectedGroupID, }), "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } result, err := admin.DescribeConsumerGroups([]string{expectedGroupID}) if err != nil { t.Fatal(err) } if len(result) != 1 { t.Fatalf("Expected 1 result, got %v", len(result)) } if result[0].GroupId != expectedGroupID { t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestListConsumerGroups(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "ListGroupsRequest": NewMockListGroupsResponse(t). AddGroup("my-group", "consumer"), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } groups, err := admin.ListConsumerGroups() if err != nil { t.Fatal(err) } if len(groups) != 1 { t.Fatalf("Expected %v results, got %v", 1, len(groups)) } protocolType, ok := groups["my-group"] if !ok { t.Fatal("Expected group to be returned, but it did not") } if protocolType != "consumer" { t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestListConsumerGroupsMultiBroker(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() secondBroker := NewMockBroker(t, 2) defer secondBroker.Close() firstGroup := "first" secondGroup := "second" nonExistingGroup := "non-existing-group" seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), "ListGroupsRequest": NewMockListGroupsResponse(t). AddGroup(firstGroup, "consumer"), }) secondBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), "ListGroupsRequest": NewMockListGroupsResponse(t). AddGroup(secondGroup, "consumer"), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } groups, err := admin.ListConsumerGroups() if err != nil { t.Fatal(err) } if len(groups) != 2 { t.Fatalf("Expected %v results, got %v", 1, len(groups)) } if _, found := groups[firstGroup]; !found { t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup) } if _, found := groups[secondGroup]; !found { t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup) } if _, found := groups[nonExistingGroup]; found { t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestListConsumerGroupOffsets(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() group := "my-group" topic := "my-topic" partition := int32(0) expectedOffset := int64(0) seedBroker.SetHandlerByMap(map[string]MockResponse{ "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError), "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{ topic: {0}, }) if err != nil { t.Fatalf("ListConsumerGroupOffsets failed with error %v", err) } block := response.GetBlock(topic, partition) if block == nil { t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition) } if block.Offset != expectedOffset { t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset) } err = admin.Close() if err != nil { t.Fatal(err) } } func TestDeleteConsumerGroup(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() group := "my-group" seedBroker.SetHandlerByMap(map[string]MockResponse{ // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError), "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}), "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), }) config := NewConfig() config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } err = admin.DeleteConsumerGroup(group) if err != nil { t.Fatalf("DeleteConsumerGroup failed with error %v", err) } } // TestRefreshMetaDataWithDifferentController ensures that the cached // controller can be forcibly updated from Metadata by the admin client func TestRefreshMetaDataWithDifferentController(t *testing.T) { seedBroker1 := NewMockBroker(t, 1) seedBroker2 := NewMockBroker(t, 2) defer seedBroker1.Close() defer seedBroker2.Close() seedBroker1.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker1.BrokerID()). SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), }) config := NewConfig() config.Version = V1_1_0_0 client, err := NewClient([]string{seedBroker1.Addr()}, config) if err != nil { t.Fatal(err) } ca := clusterAdmin{client: client, conf: config} if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() { t.Fatalf("expected cached controller to be %d rather than %d", seedBroker1.BrokerID(), b.ID()) } seedBroker1.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker2.BrokerID()). SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), }) if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() { t.Fatalf("expected refreshed controller to be %d rather than %d", seedBroker2.BrokerID(), b.ID()) } if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() { t.Fatalf("expected cached controller to be %d rather than %d", seedBroker2.BrokerID(), b.ID()) } } func TestDescribeLogDirs(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), "DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t). SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), }) config := NewConfig() config.Version = V1_0_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()}) if err != nil { t.Fatal(err) } if len(logDirsPerBroker) != 1 { t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker)) } logDirs := logDirsPerBroker[seedBroker.BrokerID()] if len(logDirs) != 1 { t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs)) } logDirsBroker := logDirs[0] if logDirsBroker.ErrorCode != ErrNoError { t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode) } if logDirsBroker.Path != "/tmp/logs" { t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path) } if len(logDirsBroker.Topics) != 2 { t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics)) } err = admin.Close() if err != nil { t.Fatal(err) } }