Browse Source

Add a config option to store only minimal metadata

Typically the default is just to fetch and store all topics, as it is
simpler and more convenient not to have to go to the network for every
new topic that is needed. However this can use a lot of memory for
clusters with many topics and partitions, so add a flag which can be
unset in order to just store the minimal topic information needed so
far.
Evan Huus 8 years ago
parent
commit
38673c0ce0
2 changed files with 32 additions and 13 deletions
  1. 25 13
      client.go
  2. 7 0
      config.go

+ 25 - 13
client.go

@@ -141,18 +141,20 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
 	}
 
-	// do an initial fetch of all cluster metadata by specifying an empty list of topics
-	err := client.RefreshMetadata()
-	switch err {
-	case nil:
-		break
-	case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
-		// indicates that maybe part of the cluster is down, but is not fatal to creating the client
-		Logger.Println(err)
-	default:
-		close(client.closed) // we haven't started the background updater yet, so we have to do this manually
-		_ = client.Close()
-		return nil, err
+	if conf.Metadata.Full {
+		// do an initial fetch of all cluster metadata by specifying an empty list of topics
+		err := client.RefreshMetadata()
+		switch err {
+		case nil:
+			break
+		case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
+			// indicates that maybe part of the cluster is down, but is not fatal to creating the client
+			Logger.Println(err)
+		default:
+			close(client.closed) // we haven't started the background updater yet, so we have to do this manually
+			_ = client.Close()
+			return nil, err
+		}
 	}
 	go withRecover(client.backgroundMetadataUpdater)
 
@@ -605,7 +607,17 @@ func (client *client) backgroundMetadataUpdater() {
 	for {
 		select {
 		case <-ticker.C:
-			if err := client.RefreshMetadata(); err != nil {
+			topics := []string{}
+			if !client.conf.Metadata.Full {
+				if specificTopics, err := client.Topics(); err != nil {
+					Logger.Println("Client background metadata topic load:", err)
+					break
+				} else {
+					topics = specificTopics
+				}
+			}
+
+			if err := client.RefreshMetadata(topics...); err != nil {
 				Logger.Println("Client background metadata update:", err)
 			}
 		case <-client.closer:

+ 7 - 0
config.go

@@ -72,6 +72,12 @@ type Config struct {
 		// Defaults to 10 minutes. Set to 0 to disable. Similar to
 		// `topic.metadata.refresh.interval.ms` in the JVM version.
 		RefreshFrequency time.Duration
+
+		// Whether to maintain a full set of metadata for all topics, or just
+		// the minimal set that has been necessary so far. The full set is simpler
+		// and usually more convenient, but can take up a substantial amount of
+		// memory if you have many topics and partitions. Defaults to true.
+		Full bool
 	}
 
 	// Producer is the namespace for configuration related to producing messages,
@@ -263,6 +269,7 @@ func NewConfig() *Config {
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond
 	c.Metadata.RefreshFrequency = 10 * time.Minute
+	c.Metadata.Full = true
 
 	c.Producer.MaxMessageBytes = 1000000
 	c.Producer.RequiredAcks = WaitForLocal