|
|
@@ -21,6 +21,7 @@ type ClientConfig struct {
|
|
|
type Client struct {
|
|
|
id string
|
|
|
config ClientConfig
|
|
|
+ closer chan struct{}
|
|
|
|
|
|
// the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
|
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
|
@@ -55,6 +56,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
|
|
|
client := &Client{
|
|
|
id: id,
|
|
|
config: *config,
|
|
|
+ closer: make(chan struct{}),
|
|
|
seedBrokerAddrs: addrs,
|
|
|
seedBroker: NewBroker(addrs[0]),
|
|
|
deadBrokerAddrs: make(map[string]struct{}),
|
|
|
@@ -108,6 +110,8 @@ func (client *Client) Close() error {
|
|
|
safeAsyncClose(client.seedBroker)
|
|
|
}
|
|
|
|
|
|
+ close(client.closer)
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -383,15 +387,16 @@ func (client *Client) backgroundMetadataUpdater() {
|
|
|
}
|
|
|
|
|
|
ticker := time.NewTicker(client.config.BackgroundRefreshFrequency)
|
|
|
- for _ = range ticker.C {
|
|
|
- if client.Closed() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ if err := client.RefreshAllMetadata(); err != nil {
|
|
|
+ Logger.Println("Client background metadata update:", err)
|
|
|
+ }
|
|
|
+ case <-client.closer:
|
|
|
ticker.Stop()
|
|
|
return
|
|
|
}
|
|
|
- err := client.RefreshAllMetadata()
|
|
|
- if err != nil {
|
|
|
- Logger.Print("Client background metadata update: ", err)
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|