Browse Source

Add method to deregister all connected brokers and provide new seedbroker to fetch metadata

Justin Chen 3 years ago
parent
commit
654bd01691
2 changed files with 60 additions and 0 deletions
  1. 29 0
      client.go
  2. 31 0
      client_test.go

+ 29 - 0
client.go

@@ -56,6 +56,11 @@ type Client interface {
 	// partition. Offline replicas are replicas which are offline
 	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
 
+	// RefreshBrokers takes a list of addresses to be used as seed brokers.
+	// Existing broker connections are closed and the updated list of seed brokers
+	// will be used for the next metadata fetch.
+	RefreshBrokers(addrs []string) error
+
 	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
 	// available metadata for those topics. If no topics are provided, it will refresh
 	// metadata for all topics.
@@ -429,6 +434,30 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
 	return leader, err
 }
 
+func (client *client) RefreshBrokers(addrs []string) error {
+	if client.Closed() {
+		return ErrClosedClient
+	}
+
+	client.lock.Lock()
+	defer client.lock.Unlock()
+
+	for _, broker := range client.brokers {
+		_ = broker.Close()
+		delete(client.brokers, broker.ID())
+	}
+
+	client.seedBrokers = nil
+	client.deadSeeds = nil
+
+	random := rand.New(rand.NewSource(time.Now().UnixNano()))
+	for _, index := range random.Perm(len(addrs)) {
+		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
+	}
+
+	return nil
+}
+
 func (client *client) RefreshMetadata(topics ...string) error {
 	if client.Closed() {
 		return ErrClosedClient

+ 31 - 0
client_test.go

@@ -515,6 +515,37 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientRefreshBrokers(t *testing.T) {
+	initialSeed := NewMockBroker(t, 0)
+	leader := NewMockBroker(t, 5)
+
+	metadataResponse1 := new(MetadataResponse)
+	metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
+	initialSeed.Returns(metadataResponse1)
+
+	c, err := NewClient([]string{initialSeed.Addr()}, nil)
+	client := c.(*client)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(client.Brokers()) != 2 {
+		t.Error("Meta broker is not 2")
+	}
+
+	newSeedBrokers := []string{"localhost:12345"}
+	_ = client.RefreshBrokers(newSeedBrokers)
+
+	if client.seedBrokers[0].addr != newSeedBrokers[0] {
+		t.Error("Seed broker not updated")
+	}
+	if len(client.Brokers()) != 0 {
+		t.Error("Old brokers not closed")
+	}
+}
+
 func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 5)