|
|
@@ -17,17 +17,24 @@ type ClientConfig struct {
|
|
|
// automatically when it passes out of scope. A single client can be safely shared by
|
|
|
// multiple concurrent Producers and Consumers.
|
|
|
type Client struct {
|
|
|
- id string
|
|
|
- config ClientConfig
|
|
|
+ id string
|
|
|
+ config ClientConfig
|
|
|
+
|
|
|
+ // the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
|
+ // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
|
+ // so we store them separately
|
|
|
+ extraBrokerAddrs []string
|
|
|
+ extraBroker *Broker
|
|
|
+
|
|
|
brokers map[int32]*Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
lock sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
}
|
|
|
|
|
|
-// NewClient creates a new Client with the given client ID. It connects to the broker at the given
|
|
|
-// host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
|
|
|
-// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
|
|
|
-func NewClient(id string, addr string, config *ClientConfig) (client *Client, err error) {
|
|
|
+// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
|
|
|
+// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
|
|
|
+// be retrieved from any of the given broker addresses, the client is not created.
|
|
|
+func NewClient(id string, addrs []string, config *ClientConfig) (client *Client, err error) {
|
|
|
if config == nil {
|
|
|
config = new(ClientConfig)
|
|
|
}
|
|
|
@@ -36,28 +43,20 @@ func NewClient(id string, addr string, config *ClientConfig) (client *Client, er
|
|
|
return nil, ConfigurationError("Invalid MetadataRetries")
|
|
|
}
|
|
|
|
|
|
- tmp := NewBroker(addr)
|
|
|
- err = tmp.Open()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- _, err = tmp.Connected()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
+ if len(addrs) < 1 {
|
|
|
+ return nil, ConfigurationError("You must provide at least one broker address")
|
|
|
}
|
|
|
|
|
|
client = new(Client)
|
|
|
client.id = id
|
|
|
client.config = *config
|
|
|
+ client.extraBrokerAddrs = addrs
|
|
|
+ client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
+ client.extraBroker.Open()
|
|
|
|
|
|
client.brokers = make(map[int32]*Broker)
|
|
|
client.leaders = make(map[string]map[int32]int32)
|
|
|
|
|
|
- // add it to the set so that refreshTopics can find it
|
|
|
- // brokers created through NewBroker() have an ID of -1, which won't conflict with
|
|
|
- // whatever the metadata request returns
|
|
|
- client.brokers[tmp.ID()] = tmp
|
|
|
-
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
|
|
|
if err != nil {
|
|
|
@@ -65,13 +64,6 @@ func NewClient(id string, addr string, config *ClientConfig) (client *Client, er
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- // So apparently a kafka broker is not required to return its own address in response
|
|
|
- // to a 'give me *all* the metadata request'... I'm not sure if that's because you're
|
|
|
- // assumed to have it already or what. Regardless, this means that we can't assume we can
|
|
|
- // disconnect our tmp broker here, since if it didn't return itself to us we want to keep
|
|
|
- // it around anyways. The worst that happens is we end up with two connections to the same
|
|
|
- // broker, one with ID -1 and one with the real ID.
|
|
|
-
|
|
|
return client, nil
|
|
|
}
|
|
|
|
|
|
@@ -87,6 +79,11 @@ func (client *Client) Close() error {
|
|
|
}
|
|
|
client.brokers = nil
|
|
|
client.leaders = nil
|
|
|
+
|
|
|
+ if client.extraBroker != nil {
|
|
|
+ go client.extraBroker.Close()
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -133,9 +130,20 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
- // we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
- // the broker lookup will return nil
|
|
|
- delete(client.brokers, broker.ID())
|
|
|
+ if broker == client.extraBroker {
|
|
|
+ client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
|
|
|
+ if len(client.extraBrokerAddrs) > 0 {
|
|
|
+ client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
+ client.extraBroker.Open()
|
|
|
+ } else {
|
|
|
+ client.extraBroker = nil
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
+ // the broker lookup will return nil
|
|
|
+ delete(client.brokers, broker.ID())
|
|
|
+ }
|
|
|
+
|
|
|
go broker.Close()
|
|
|
}
|
|
|
|
|
|
@@ -188,7 +196,7 @@ func (client *Client) any() *Broker {
|
|
|
return broker
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return client.extraBroker
|
|
|
}
|
|
|
|
|
|
func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
|