|
|
@@ -80,7 +80,8 @@ func (client *Client) Close() error {
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
for _, broker := range client.brokers {
|
|
|
- go withRecover(func() { broker.Close() })
|
|
|
+ myBroker := broker // NB: block-local prevents clobbering
|
|
|
+ go withRecover(func() { myBroker.Close() })
|
|
|
}
|
|
|
client.brokers = nil
|
|
|
client.leaders = nil
|
|
|
@@ -180,7 +181,8 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
delete(client.brokers, broker.ID())
|
|
|
}
|
|
|
|
|
|
- go withRecover(func() { broker.Close() })
|
|
|
+ myBroker := broker // NB: block-local prevents clobbering
|
|
|
+ go withRecover(func() { myBroker.Close() })
|
|
|
}
|
|
|
|
|
|
func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
@@ -286,7 +288,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.brokers[broker.ID()] = broker
|
|
|
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
|
|
|
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
|
|
|
- go withRecover(func() { client.brokers[broker.ID()].Close() })
|
|
|
+ myBroker := client.brokers[broker.ID()] // use block-local to prevent clobbering `broker` for Gs
|
|
|
+ go withRecover(func() { myBroker.Close() })
|
|
|
broker.Open(client.config.ConcurrencyPerBroker)
|
|
|
client.brokers[broker.ID()] = broker
|
|
|
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
|