ソースを参照

Lazily connect to brokers in the client

Instead of opening a connection to all brokers immediately upon receiving their
information in metadata, wait until we are asked for them either via a call to
`Leader` or a call to `any`.
Evan Huus 10 年 前
コミット
533cce206d
2 ファイル変更7 行追加13 行削除
  1. 3 5
      client.go
  2. 4 8
      client_test.go

+ 3 - 5
client.go

@@ -337,10 +337,12 @@ func (client *Client) any() *Broker {
 	defer client.lock.RUnlock()
 
 	if client.seedBroker != nil {
+		_ = client.seedBroker.Open(client.conf)
 		return client.seedBroker
 	}
 
 	for _, broker := range client.brokers {
+		_ = broker.Open(client.conf)
 		return broker
 	}
 
@@ -436,6 +438,7 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
 			if b == nil {
 				return nil, ErrLeaderNotAvailable
 			}
+			_ = b.Open(client.conf)
 			return b, nil
 		}
 	}
@@ -538,17 +541,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// - if it is a new ID, save it
 	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
 	// - otherwise ignore it, replacing our existing one would just bounce the connection
-	// We asynchronously try to open connections to the new brokers. We don't care if they
-	// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
-	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
-			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			safeAsyncClose(client.brokers[broker.ID()])
-			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}

+ 4 - 8
client_test.go

@@ -28,15 +28,14 @@ func TestSimpleClient(t *testing.T) {
 
 func TestCachedPartitions(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 5)
 
 	replicas := []int32{3, 1, 5}
 	isr := []int32{5, 1}
 
 	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
+	metadataResponse.AddBroker("localhost:12345", 2)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -61,17 +60,15 @@ func TestCachedPartitions(t *testing.T) {
 		t.Fatal("Not using the cache!")
 	}
 
-	leader.Close()
 	seedBroker.Close()
 	safeClose(t, client)
 }
 
 func TestClientSeedBrokers(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
-	discoveredBroker := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
+	metadataResponse.AddBroker("localhost:12345", 2)
 	seedBroker.Returns(metadataResponse)
 
 	client, err := NewClient([]string{seedBroker.Addr()}, nil)
@@ -79,7 +76,6 @@ func TestClientSeedBrokers(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	discoveredBroker.Close()
 	seedBroker.Close()
 	safeClose(t, client)
 }