Browse Source

Permit creating client with multiple broker addresses

Fixes #7

This does basically the simplest thing that works. It has some flaws, but most
importantly it gets the API in place and is basically functional. We can
improve it later without breaking API compatibility.

Remaining sub-optimal behaviour:
 * If the user provides three addresses, only the last of which is valid, we
   wait for the first two connections to completely fail (which may take several
   minutes depending on the TCP timeout) before we try the third. Trying them
   all in parallel and using the first valid one would be better.
 * Once we've had an address fail, we discard it forever. Over the lifetime of a
   long-running cluster, all of the nodes may be down at one point or another,
   meaning that eventually we may 'run out' of nodes and abort. We should
   probably just mark the address as recently-failed, and retry them after some
   time has passed. Only if all nodes have *recently* failed should we abort.
Evan Huus 11 years ago
parent
commit
34918e6fce
4 changed files with 51 additions and 42 deletions
  1. 38 30
      client.go
  2. 9 8
      client_test.go
  3. 2 2
      consumer_test.go
  4. 2 2
      producer_test.go

+ 38 - 30
client.go

@@ -17,17 +17,24 @@ type ClientConfig struct {
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 type Client struct {
-	id      string
-	config  ClientConfig
+	id     string
+	config ClientConfig
+
+	// the broker addresses given to us through the constructor are not guaranteed to be returned in
+	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
+	// so we store them separately
+	extraBrokerAddrs []string
+	extraBroker      *Broker
+
 	brokers map[int32]*Broker          // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
 	lock    sync.RWMutex               // protects access to the maps, only one since they're always written together
 }
 
-// NewClient creates a new Client with the given client ID. It connects to the broker at the given
-// host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
-// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
-func NewClient(id string, addr string, config *ClientConfig) (client *Client, err error) {
+// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
+// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
+// be retrieved from any of the given broker addresses, the client is not created.
+func NewClient(id string, addrs []string, config *ClientConfig) (client *Client, err error) {
 	if config == nil {
 		config = new(ClientConfig)
 	}
@@ -36,28 +43,20 @@ func NewClient(id string, addr string, config *ClientConfig) (client *Client, er
 		return nil, ConfigurationError("Invalid MetadataRetries")
 	}
 
-	tmp := NewBroker(addr)
-	err = tmp.Open()
-	if err != nil {
-		return nil, err
-	}
-	_, err = tmp.Connected()
-	if err != nil {
-		return nil, err
+	if len(addrs) < 1 {
+		return nil, ConfigurationError("You must provide at least one broker address")
 	}
 
 	client = new(Client)
 	client.id = id
 	client.config = *config
+	client.extraBrokerAddrs = addrs
+	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
+	client.extraBroker.Open()
 
 	client.brokers = make(map[int32]*Broker)
 	client.leaders = make(map[string]map[int32]int32)
 
-	// add it to the set so that refreshTopics can find it
-	// brokers created through NewBroker() have an ID of -1, which won't conflict with
-	// whatever the metadata request returns
-	client.brokers[tmp.ID()] = tmp
-
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
 	if err != nil {
@@ -65,13 +64,6 @@ func NewClient(id string, addr string, config *ClientConfig) (client *Client, er
 		return nil, err
 	}
 
-	// So apparently a kafka broker is not required to return its own address in response
-	// to a 'give me *all* the metadata request'... I'm not sure if that's because you're
-	// assumed to have it already or what. Regardless, this means that we can't assume we can
-	// disconnect our tmp broker here, since if it didn't return itself to us we want to keep
-	// it around anyways. The worst that happens is we end up with two connections to the same
-	// broker, one with ID -1 and one with the real ID.
-
 	return client, nil
 }
 
@@ -87,6 +79,11 @@ func (client *Client) Close() error {
 	}
 	client.brokers = nil
 	client.leaders = nil
+
+	if client.extraBroker != nil {
+		go client.extraBroker.Close()
+	}
+
 	return nil
 }
 
@@ -133,9 +130,20 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
-	// we don't need to update the leaders hash, it will automatically get refreshed next time because
-	// the broker lookup will return nil
-	delete(client.brokers, broker.ID())
+	if broker == client.extraBroker {
+		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
+		if len(client.extraBrokerAddrs) > 0 {
+			client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
+			client.extraBroker.Open()
+		} else {
+			client.extraBroker = nil
+		}
+	} else {
+		// we don't need to update the leaders hash, it will automatically get refreshed next time because
+		// the broker lookup will return nil
+		delete(client.brokers, broker.ID())
+	}
+
 	go broker.Close()
 }
 
@@ -188,7 +196,7 @@ func (client *Client) any() *Broker {
 		return broker
 	}
 
-	return nil
+	return client.extraBroker
 }
 
 func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {

+ 9 - 8
client_test.go

@@ -13,7 +13,7 @@ func TestSimpleClient(t *testing.T) {
 	// Only one response needed, an empty metadata response
 	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", mockBroker.Addr(), nil)
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -37,7 +37,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", mockBroker.Addr(), nil)
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +70,7 @@ func TestClientMetadata(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", mockBroker.Addr(), nil)
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -92,9 +92,10 @@ func TestClientMetadata(t *testing.T) {
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
-	responses := make(chan []byte, 3)
+	responses := make(chan []byte, 1)
+	extraResponses := make(chan []byte, 2)
 	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
+	mockExtra := NewMockBroker(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -107,7 +108,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00}
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
-	responses <- []byte{
+	extraResponses <- []byte{
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00,
@@ -118,7 +119,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
-	responses <- []byte{
+	extraResponses <- []byte{
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00,
@@ -130,7 +131,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", mockBroker.Addr(), &ClientConfig{MetadataRetries: 1})
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}

+ 2 - 2
consumer_test.go

@@ -64,7 +64,7 @@ func TestSimpleConsumer(t *testing.T) {
 			0x00, 0x00, 0x00, 0x00}
 	}()
 
-	client, err := NewClient("clientID", mockBroker.Addr(), nil)
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -89,7 +89,7 @@ func TestSimpleConsumer(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("myClient", "localhost:9092", nil)
+	client, err := NewClient("myClient", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 2 - 2
producer_test.go

@@ -45,7 +45,7 @@ func TestSimpleProducer(t *testing.T) {
 		}
 	}()
 
-	client, err := NewClient("clientID", mockBroker.Addr(), nil)
+	client, err := NewClient("clientID", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -66,7 +66,7 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("myClient", "localhost:9092", nil)
+	client, err := NewClient("myClient", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {