Browse Source

Merge pull request #1645 from Stephan14/bugfix/remove_invalid_broker

Remove broker(s) which no longer exist in metadata
Dominic Evans 5 years ago
parent
commit
2d3d6cd11c
3 changed files with 68 additions and 7 deletions
  1. 26 3
      client.go
  2. 33 2
      client_test.go
  3. 9 2
      consumer_test.go

+ 26 - 3
client.go

@@ -562,6 +562,30 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
 
 
 // private broker management helpers
 // private broker management helpers
 
 
+func (client *client) updateBroker(brokers []*Broker) {
+	var currentBroker = make(map[int32]*Broker, len(brokers))
+
+	for _, broker := range brokers {
+		currentBroker[broker.ID()] = broker
+		if client.brokers[broker.ID()] == nil { // add new broker
+			client.brokers[broker.ID()] = broker
+			Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
+		} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
+			safeAsyncClose(client.brokers[broker.ID()])
+			client.brokers[broker.ID()] = broker
+			Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
+		}
+	}
+
+	for id, broker := range client.brokers {
+		if _, exist := currentBroker[id]; !exist { // remove old broker
+			safeAsyncClose(broker)
+			delete(client.brokers, id)
+			Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
+		}
+	}
+}
+
 // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
 // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
@@ -885,10 +909,9 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
 	// For all the brokers we received:
 	// For all the brokers we received:
 	// - if it is a new ID, save it
 	// - if it is a new ID, save it
 	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
 	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
+	// - if some brokers is not exist in it, remove old broker
 	// - otherwise ignore it, replacing our existing one would just bounce the connection
 	// - otherwise ignore it, replacing our existing one would just bounce the connection
-	for _, broker := range data.Brokers {
-		client.registerBroker(broker)
-	}
+	client.updateBroker(data.Brokers)
 
 
 	client.controllerID = data.ControllerID
 	client.controllerID = data.ControllerID
 
 

+ 33 - 2
client_test.go

@@ -441,6 +441,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
 	replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
 	replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
 
 
 	metadataPartial := new(MetadataResponse)
 	metadataPartial := new(MetadataResponse)
+	metadataPartial.AddBroker(seedBroker.Addr(), 1)
+	metadataPartial.AddBroker(leader.Addr(), 5)
 	metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
 	metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
 	metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
 	metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
 	metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
 	metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
@@ -485,6 +487,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker.Returns(metadataResponse1)
 	seedBroker.Returns(metadataResponse1)
 
 
 	metadataResponse2 := new(MetadataResponse)
 	metadataResponse2 := new(MetadataResponse)
+	metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 	seedBroker.Returns(metadataResponse2)
 
 
@@ -512,6 +515,36 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	safeClose(t, client)
 	safeClose(t, client)
 }
 }
 
 
+func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 5)
+
+	metadataResponse1 := new(MetadataResponse)
+	metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
+	seedBroker.Returns(metadataResponse1)
+
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(client.Brokers()) != 2 {
+		t.Error("Meta broker is not 2")
+	}
+
+	metadataResponse2 := new(MetadataResponse)
+	metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
+	seedBroker.Returns(metadataResponse2)
+
+	if err := client.RefreshMetadata(); err != nil {
+		t.Error(err)
+	}
+	if len(client.Brokers()) != 1 {
+		t.Error("Meta broker is not 1")
+	}
+}
+
 func TestClientResurrectDeadSeeds(t *testing.T) {
 func TestClientResurrectDeadSeeds(t *testing.T) {
 	initialSeed := NewMockBroker(t, 0)
 	initialSeed := NewMockBroker(t, 0)
 	emptyMetadata := new(MetadataResponse)
 	emptyMetadata := new(MetadataResponse)
@@ -656,7 +689,6 @@ func TestClientMetadataTimeout(t *testing.T) {
 
 
 			// Start refreshing metadata in the background
 			// Start refreshing metadata in the background
 			errChan := make(chan error)
 			errChan := make(chan error)
-			start := time.Now()
 			go func() {
 			go func() {
 				errChan <- c.RefreshMetadata()
 				errChan <- c.RefreshMetadata()
 			}()
 			}()
@@ -666,7 +698,6 @@ func TestClientMetadataTimeout(t *testing.T) {
 			maxRefreshDuration := 2 * timeout
 			maxRefreshDuration := 2 * timeout
 			select {
 			select {
 			case err := <-errChan:
 			case err := <-errChan:
-				t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
 				if err == nil {
 				if err == nil {
 					t.Fatal("Expected failed RefreshMetadata, got nil")
 					t.Fatal("Expected failed RefreshMetadata, got nil")
 				}
 				}

+ 9 - 2
consumer_test.go

@@ -640,6 +640,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		"MetadataRequest": NewMockMetadataResponse(t).
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetBroker(leader0.Addr(), leader0.BrokerID()).
 			SetBroker(leader0.Addr(), leader0.BrokerID()).
 			SetBroker(leader1.Addr(), leader1.BrokerID()).
 			SetBroker(leader1.Addr(), leader1.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
 			SetLeader("my_topic", 0, leader0.BrokerID()).
 			SetLeader("my_topic", 0, leader0.BrokerID()).
 			SetLeader("my_topic", 1, leader1.BrokerID()),
 			SetLeader("my_topic", 1, leader1.BrokerID()),
 	})
 	})
@@ -720,7 +721,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 		"MetadataRequest": NewMockMetadataResponse(t).
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetLeader("my_topic", 0, leader1.BrokerID()).
 			SetLeader("my_topic", 0, leader1.BrokerID()).
-			SetLeader("my_topic", 1, leader1.BrokerID()),
+			SetLeader("my_topic", 1, leader1.BrokerID()).
+			SetBroker(leader0.Addr(), leader0.BrokerID()).
+			SetBroker(leader1.Addr(), leader1.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
 	})
 	})
 
 
 	// leader0 says no longer leader of partition 0
 	// leader0 says no longer leader of partition 0
@@ -759,7 +763,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 		"MetadataRequest": NewMockMetadataResponse(t).
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetLeader("my_topic", 0, leader1.BrokerID()).
 			SetLeader("my_topic", 0, leader1.BrokerID()).
-			SetLeader("my_topic", 1, leader0.BrokerID()),
+			SetLeader("my_topic", 1, leader0.BrokerID()).
+			SetBroker(leader0.Addr(), leader0.BrokerID()).
+			SetBroker(leader1.Addr(), leader1.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
 	})
 	})
 
 
 	// leader1 provides three more messages on partition0, says no longer leader of partition1
 	// leader1 provides three more messages on partition0, says no longer leader of partition1