|
@@ -257,27 +257,22 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.lock.Lock()
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
|
|
- // First discard brokers that we already know about. This avoids bouncing TCP connections,
|
|
|
|
|
- // and especially avoids closing valid connections out from under other code which may be trying
|
|
|
|
|
- // to use them.
|
|
|
|
|
- var newBrokers []*Broker
|
|
|
|
|
- for _, broker := range data.Brokers {
|
|
|
|
|
- if !broker.Equals(client.brokers[broker.ID()]) {
|
|
|
|
|
- newBrokers = append(newBrokers, broker)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Now asynchronously try to open connections to the new brokers. We don't care if they
|
|
|
|
|
|
|
+ // For all the brokers we received:
|
|
|
|
|
+ // - if it is a new ID, save it
|
|
|
|
|
+ // - if it is an existing ID, but the address we have is stale, discard the old one and save it
|
|
|
|
|
+ // - otherwise ignore it, replacing our existing one would just bounce the connection
|
|
|
|
|
+ // We asynchronously try to open connections to the new brokers. We don't care if they
|
|
|
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
|
|
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
|
|
|
// If it fails and we do care, whoever tries to use it will get the connection error.
|
|
// If it fails and we do care, whoever tries to use it will get the connection error.
|
|
|
- // If we have an old broker with that ID (but a different host/port, since they didn't
|
|
|
|
|
- // compare as equals above) then close and remove that broker before saving the new one.
|
|
|
|
|
- for _, broker := range newBrokers {
|
|
|
|
|
- if client.brokers[broker.ID()] != nil {
|
|
|
|
|
|
|
+ for _, broker := range data.Brokers {
|
|
|
|
|
+ if client.brokers[broker.ID()] == nil {
|
|
|
|
|
+ broker.Open()
|
|
|
|
|
+ client.brokers[broker.ID()] = broker
|
|
|
|
|
+ } else if broker.Addr() != client.brokers[broker.ID()].Addr() {
|
|
|
go client.brokers[broker.ID()].Close()
|
|
go client.brokers[broker.ID()].Close()
|
|
|
|
|
+ broker.Open()
|
|
|
|
|
+ client.brokers[broker.ID()] = broker
|
|
|
}
|
|
}
|
|
|
- broker.Open()
|
|
|
|
|
- client.brokers[broker.ID()] = broker
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
toRetry := make(map[string]bool)
|
|
toRetry := make(map[string]bool)
|