|
|
@@ -42,7 +42,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
client.brokers = make(map[int32]*k.Broker)
|
|
|
client.leaders = make(map[string]map[int32]int32)
|
|
|
|
|
|
- // add it temporarily so that refreshTopics can find it
|
|
|
+ // add it to the set so that refreshTopics can find it
|
|
|
// brokers created through NewBroker() have an ID of -1, which won't conflict with
|
|
|
// whatever the metadata request returns
|
|
|
client.brokers[tmp.ID()] = tmp
|
|
|
@@ -54,10 +54,12 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- // now remove our tmp broker - the successful metadata request will have returned it
|
|
|
- // with a valid ID, so it will already be in the hash somewhere else and we don't need
|
|
|
- // the incomplete tmp one anymore
|
|
|
- client.disconnectBroker(tmp)
|
|
|
+ // So apparently a kafka broker is not required to return its own address in response
|
|
|
+ // to a 'give me *all* the metadata request'... I'm not sure if that's because you're
|
|
|
+ // assumed to have it already or what. Regardless, this means that we can't assume we can
|
|
|
+ // disconnect our tmp broker here, since if it didn't return itself to us we want to keep
|
|
|
+ // it around anyways. The worst that happens is we end up with two connections to the same
|
|
|
+ // broker, one with ID -1 and one with the real ID.
|
|
|
|
|
|
return client, nil
|
|
|
}
|