Browse Source

Add option to refresh metadata on a timer

Default to every 10 minutes to match behaviour of jvm reference implementation.

As discussed on #144
Evan Huus 11 years ago
parent
commit
b7417e61b0
1 changed files with 30 additions and 5 deletions
  1. 30 5
      client.go

+ 30 - 5
client.go

@@ -8,9 +8,10 @@ import (
 
 // ClientConfig is used to pass multiple configuration options to NewClient.
 type ClientConfig struct {
-	MetadataRetries   int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection   time.Duration // How long to wait for leader election to finish between retries.
-	DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client.
+	MetadataRetries            int           // How many times to retry a metadata request when a partition is in the middle of leader election.
+	WaitForElection            time.Duration // How long to wait for leader election to finish between retries.
+	DefaultBrokerConf          *BrokerConfig // Default configuration for broker connections created by this client.
+	BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable.
 }
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -67,6 +68,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		client.Close()
 		return nil, err
 	}
+	go withRecover(client.backgroundMetadataUpdater)
 
 	Logger.Println("Successfully initialized new client")
 
@@ -374,6 +376,24 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 	return ret
 }
 
+func (client *Client) backgroundMetadataUpdater() {
+	if client.config.BackgroundRefreshFrequency == time.Duration(0) {
+		return
+	}
+
+	ticker := time.NewTicker(client.config.BackgroundRefreshFrequency)
+	for _ = range ticker.C {
+		if client.Closed() {
+			ticker.Stop()
+			return
+		}
+		err := client.RefreshAllMetadata()
+		if err != nil {
+			Logger.Print("Client background metadata update: ", err)
+		}
+	}
+}
+
 // if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable
 func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	client.lock.Lock()
@@ -435,8 +455,9 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 // NewClientConfig creates a new ClientConfig instance with sensible defaults
 func NewClientConfig() *ClientConfig {
 	return &ClientConfig{
-		MetadataRetries: 3,
-		WaitForElection: 250 * time.Millisecond,
+		MetadataRetries:            3,
+		WaitForElection:            250 * time.Millisecond,
+		BackgroundRefreshFrequency: 10 * time.Minute,
 	}
 }
 
@@ -457,5 +478,9 @@ func (config *ClientConfig) Validate() error {
 		}
 	}
 
+	if config.BackgroundRefreshFrequency < time.Duration(0) {
+		return ConfigurationError("Invalid BackgroundRefreshFrequency.")
+	}
+
 	return nil
 }