ソースを参照

add Controller() method to Client interface

justfly 7 年 前
コミット
e06b9a2882
4 ファイル変更123 行追加19 行削除
  1. 61 15
      client.go
  2. 41 0
      client_test.go
  3. 8 0
      errors.go
  4. 13 4
      mockresponses.go

+ 61 - 15
client.go

@@ -17,6 +17,9 @@ type Client interface {
 	// altered after it has been created.
 	Config() *Config
 
+	// Controller returns the cluster controller broker.
+	Controller() (*Broker, error)
+
 	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
 	Brokers() []*Broker
 
@@ -97,6 +100,7 @@ type client struct {
 	seedBrokers []*Broker
 	deadSeeds   []*Broker
 
+	controllerID int32                                   // cluster controller broker id
 	brokers      map[int32]*Broker                       // maps broker ids to brokers
 	metadata     map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
 	coordinators map[string]int32                        // Maps consumer group names to coordinating broker IDs
@@ -379,6 +383,27 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
 	return offset, err
 }
 
+func (client *client) Controller() (*Broker, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	controller := client.cachedController()
+	if controller == nil {
+		if err := client.refreshMetadata(); err != nil {
+			return nil, err
+		}
+		controller = client.cachedController()
+	}
+
+	if controller == nil {
+		return nil, ErrControllerNotAvailable
+	}
+
+	_ = controller.Open(client.conf)
+	return controller, nil
+}
+
 func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient
@@ -607,20 +632,7 @@ func (client *client) backgroundMetadataUpdater() {
 	for {
 		select {
 		case <-ticker.C:
-			topics := []string{}
-			if !client.conf.Metadata.Full {
-				if specificTopics, err := client.Topics(); err != nil {
-					Logger.Println("Client background metadata topic load:", err)
-					break
-				} else if len(specificTopics) == 0 {
-					Logger.Println("Client background metadata update: no specific topics to update")
-					break
-				} else {
-					topics = specificTopics
-				}
-			}
-
-			if err := client.RefreshMetadata(topics...); err != nil {
+			if err := client.refreshMetadata(); err != nil {
 				Logger.Println("Client background metadata update:", err)
 			}
 		case <-client.closer:
@@ -629,6 +641,26 @@ func (client *client) backgroundMetadataUpdater() {
 	}
 }
 
+func (client *client) refreshMetadata() error {
+	topics := []string{}
+
+	if !client.conf.Metadata.Full {
+		if specificTopics, err := client.Topics(); err != nil {
+			return err
+		} else if len(specificTopics) == 0 {
+			return ErrNoTopicsToUpdateMetadata
+		} else {
+			topics = specificTopics
+		}
+	}
+
+	if err := client.RefreshMetadata(topics...); err != nil {
+		return err
+	}
+
+	return nil
+}
+
 func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
 	retry := func(err error) error {
 		if attemptsRemaining > 0 {
@@ -645,7 +677,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
 		} else {
 			Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
 		}
-		response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})
+
+		req := &MetadataRequest{Topics: topics}
+		if client.conf.Version.IsAtLeast(V0_10_0_0) {
+			req.Version = 1
+		}
+		response, err := broker.GetMetadata(req)
 
 		switch err.(type) {
 		case nil:
@@ -686,6 +723,8 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
 		client.registerBroker(broker)
 	}
 
+	client.controllerID = data.ControllerID
+
 	for _, topic := range data.Topics {
 		delete(client.metadata, topic.Name)
 		delete(client.cachedPartitionsResults, topic.Name)
@@ -735,6 +774,13 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
 	return nil
 }
 
+func (client *client) cachedController() *Broker {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
+	return client.brokers[client.controllerID]
+}
+
 func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
 	retry := func(err error) (*FindCoordinatorResponse, error) {
 		if attemptsRemaining > 0 {

+ 41 - 0
client_test.go

@@ -444,6 +444,47 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
 	safeClose(t, c)
 }
 
+func TestClientController(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+	controllerBroker := NewMockBroker(t, 2)
+	defer controllerBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(controllerBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
+	})
+
+	cfg := NewConfig()
+
+	// test kafka version greater than 0.10.0.0
+	cfg.Version = V0_10_0_0
+	client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, client1)
+	broker, err := client1.Controller()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if broker.Addr() != controllerBroker.Addr() {
+		t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
+	}
+
+	// test kafka version earlier than 0.10.0.0
+	cfg.Version = V0_9_0_1
+	client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, client2)
+	if _, err = client2.Controller(); err != ErrControllerNotAvailable {
+		t.Errorf("Expected Contoller() to return %s, found %s", ErrControllerNotAvailable, err)
+	}
+}
 func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	staleCoordinator := NewMockBroker(t, 2)

+ 8 - 0
errors.go

@@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc
 // a RecordBatch.
 var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
 
+// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
+// is lower than 0.10.0.0.
+var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable")
+
+// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
+// the metadata.
+var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
+
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type PacketEncodingError struct {

+ 13 - 4
mockresponses.go

@@ -68,9 +68,10 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
 
 // MockMetadataResponse is a `MetadataResponse` builder.
 type MockMetadataResponse struct {
-	leaders map[string]map[int32]int32
-	brokers map[string]int32
-	t       TestReporter
+	controllerID int32
+	leaders      map[string]map[int32]int32
+	brokers      map[string]int32
+	t            TestReporter
 }
 
 func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
@@ -96,9 +97,17 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet
 	return mmr
 }
 
+func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
+	mmr.controllerID = brokerID
+	return mmr
+}
+
 func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 	metadataRequest := reqBody.(*MetadataRequest)
-	metadataResponse := &MetadataResponse{}
+	metadataResponse := &MetadataResponse{
+		Version:      metadataRequest.version(),
+		ControllerID: mmr.controllerID,
+	}
 	for addr, brokerID := range mmr.brokers {
 		metadataResponse.AddBroker(addr, brokerID)
 	}