Explorar o código

fix: use the broker for any admin on BrokerConfig

When operating on Broker configuration via the Admin API, the request
_must_ be sent to the specific broker that the change applies to, _not_
just (as usual) to the Controller.
Dominic Evans %!s(int64=4) %!d(string=hai) anos
pai
achega
fb8b9b5a10
Modificáronse 4 ficheiros con 196 adicións e 18 borrados
  1. 45 2
      admin.go
  2. 113 0
      admin_test.go
  3. 11 15
      config_resource_type.go
  4. 27 1
      mockresponses.go

+ 45 - 2
admin.go

@@ -2,7 +2,9 @@ package sarama
 
 import (
 	"errors"
+	"fmt"
 	"math/rand"
+	"strconv"
 	"sync"
 )
 
@@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
 	return response.Brokers, response.ControllerID, nil
 }
 
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+	brokers := ca.client.Brokers()
+	for _, b := range brokers {
+		if b.ID() == id {
+			return b, nil
+		}
+	}
+	return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
 func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
 	brokers := ca.client.Brokers()
 	if len(brokers) > 0 {
@@ -432,6 +444,13 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
 	return nil
 }
 
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+	return (resource.Type == BrokerResource && resource.Name != "") ||
+		resource.Type == BrokerLoggerResource
+}
+
 func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
 
 	var entries []ConfigEntry
@@ -442,11 +461,23 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
 		Resources: resources,
 	}
 
-	b, err := ca.Controller()
+	var (
+		b   *Broker
+		err error
+	)
+
+	// DescribeConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(resource) {
+		id, _ := strconv.Atoi(resource.Name)
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return nil, err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.DescribeConfigs(request)
 	if err != nil {
 		return nil, err
@@ -479,11 +510,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
 		ValidateOnly: validateOnly,
 	}
 
-	b, err := ca.Controller()
+	var (
+		b   *Broker
+		err error
+	)
+
+	// AlterConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+		id, _ := strconv.Atoi(name)
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.AlterConfigs(request)
 	if err != nil {
 		return err

+ 113 - 0
admin_test.go

@@ -2,6 +2,10 @@ package sarama
 
 import (
 	"errors"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"os"
 	"strings"
 	"testing"
 )
@@ -511,6 +515,61 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
 	}
 }
 
+// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
+// is sent to the broker in the resource struct, _not_ the controller
+func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
+	Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
+	defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()
+
+	controllerBroker := NewMockBroker(t, 1)
+	defer controllerBroker.Close()
+	configBroker := NewMockBroker(t, 2)
+	defer configBroker.Close()
+
+	controllerBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+	})
+
+	configBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin(
+		[]string{
+			controllerBroker.Addr(),
+			configBroker.Addr(),
+		}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
+		resource := ConfigResource{Name: "2", Type: resourceType}
+		entries, err := admin.DescribeConfig(resource)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		if len(entries) <= 0 {
+			t.Fatal(errors.New("no resource present"))
+		}
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminAlterConfig(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()
@@ -544,6 +603,60 @@ func TestClusterAdminAlterConfig(t *testing.T) {
 	}
 }
 
+func TestClusterAdminAlterBrokerConfig(t *testing.T) {
+	controllerBroker := NewMockBroker(t, 1)
+	defer controllerBroker.Close()
+	configBroker := NewMockBroker(t, 2)
+	defer configBroker.Close()
+
+	controllerBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+	})
+	configBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
+			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
+		"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin(
+		[]string{
+			controllerBroker.Addr(),
+			configBroker.Addr(),
+		}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var value string
+	entries := make(map[string]*string)
+	value = "3"
+	entries["min.insync.replicas"] = &value
+
+	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
+		resource := ConfigResource{Name: "2", Type: resourceType}
+		err = admin.AlterConfig(
+			resource.Type,
+			resource.Name,
+			entries,
+			false)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminCreateAcl(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()

+ 11 - 15
config_resource_type.go

@@ -1,22 +1,18 @@
 package sarama
 
-//ConfigResourceType is a type for config resource
+// ConfigResourceType is a type for resources that have configs.
 type ConfigResourceType int8
 
-// Taken from :
-// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
+// Taken from:
+// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55
 
 const (
-	//UnknownResource constant type
-	UnknownResource ConfigResourceType = iota
-	//AnyResource constant type
-	AnyResource
-	//TopicResource constant type
-	TopicResource
-	//GroupResource constant type
-	GroupResource
-	//ClusterResource constant type
-	ClusterResource
-	//BrokerResource constant type
-	BrokerResource
+	// UnknownResource constant type
+	UnknownResource ConfigResourceType = 0
+	// TopicResource constant type
+	TopicResource ConfigResourceType = 2
+	// BrokerResource constant type
+	BrokerResource ConfigResourceType = 4
+	// BrokerLoggerResource constant type
+	BrokerLoggerResource ConfigResourceType = 8
 )

+ 27 - 1
mockresponses.go

@@ -736,6 +736,32 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 	for _, r := range req.Resources {
 		var configEntries []*ConfigEntry
 		switch r.Type {
+		case BrokerResource:
+			configEntries = append(configEntries,
+				&ConfigEntry{
+					Name:     "min.insync.replicas",
+					Value:    "2",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		case BrokerLoggerResource:
+			configEntries = append(configEntries,
+				&ConfigEntry{
+					Name:     "kafka.controller.KafkaController",
+					Value:    "DEBUG",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
 		case TopicResource:
 			configEntries = append(configEntries,
 				&ConfigEntry{Name: "max.message.bytes",
@@ -777,7 +803,7 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
 
 	for _, r := range req.Resources {
 		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
-			Type:     TopicResource,
+			Type:     r.Type,
 			ErrorMsg: "",
 		})
 	}