|
|
@@ -70,8 +70,8 @@ const (
|
|
|
)
|
|
|
|
|
|
type client struct {
|
|
|
- conf *Config
|
|
|
- closer chan none
|
|
|
+ conf *Config
|
|
|
+ closer, closed chan none // for shutting down background metadata updater
|
|
|
|
|
|
// the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
|
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
|
@@ -111,6 +111,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
|
|
|
client := &client{
|
|
|
conf: conf,
|
|
|
closer: make(chan none),
|
|
|
+ closed: make(chan none),
|
|
|
brokers: make(map[int32]*Broker),
|
|
|
metadata: make(map[string]map[int32]*PartitionMetadata),
|
|
|
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
|
|
|
@@ -129,6 +130,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
|
|
|
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
|
|
|
Logger.Println(err)
|
|
|
default:
|
|
|
+ close(client.closed) // we haven't started the background updater yet, so we have to do this manually
|
|
|
_ = client.Close()
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -151,6 +153,10 @@ func (client *client) Close() error {
|
|
|
return ErrClosedClient
|
|
|
}
|
|
|
|
|
|
+ // shutdown and wait for the background thread before we take the lock, to avoid races
|
|
|
+ close(client.closer)
|
|
|
+ <-client.closed
|
|
|
+
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
Logger.Println("Closing Client")
|
|
|
@@ -166,8 +172,6 @@ func (client *client) Close() error {
|
|
|
client.brokers = nil
|
|
|
client.metadata = nil
|
|
|
|
|
|
- close(client.closer)
|
|
|
-
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -530,11 +534,15 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
|
|
|
// core metadata update logic
|
|
|
|
|
|
func (client *client) backgroundMetadataUpdater() {
|
|
|
+ defer close(client.closed)
|
|
|
+
|
|
|
if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
|
|
|
+ defer ticker.Stop()
|
|
|
+
|
|
|
for {
|
|
|
select {
|
|
|
case <-ticker.C:
|
|
|
@@ -542,7 +550,6 @@ func (client *client) backgroundMetadataUpdater() {
|
|
|
Logger.Println("Client background metadata update:", err)
|
|
|
}
|
|
|
case <-client.closer:
|
|
|
- ticker.Stop()
|
|
|
return
|
|
|
}
|
|
|
}
|