Ver código fonte

Merge pull request #937 from Shopify/optional-minimal-metadata-set

Add a config option to store only minimal metadata
Evan Huus 8 anos atrás
pai
commit
55b3f6a77f
2 arquivos alterados com 32 adições e 13 exclusões
  1. 25 13
      client.go
  2. 7 0
      config.go

+ 25 - 13
client.go

@@ -141,18 +141,20 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
 	}
 
-	// do an initial fetch of all cluster metadata by specifying an empty list of topics
-	err := client.RefreshMetadata()
-	switch err {
-	case nil:
-		break
-	case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
-		// indicates that maybe part of the cluster is down, but is not fatal to creating the client
-		Logger.Println(err)
-	default:
-		close(client.closed) // we haven't started the background updater yet, so we have to do this manually
-		_ = client.Close()
-		return nil, err
+	if conf.Metadata.Full {
+		// do an initial fetch of all cluster metadata by specifying an empty list of topics
+		err := client.RefreshMetadata()
+		switch err {
+		case nil:
+			break
+		case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
+			// indicates that maybe part of the cluster is down, but is not fatal to creating the client
+			Logger.Println(err)
+		default:
+			close(client.closed) // we haven't started the background updater yet, so we have to do this manually
+			_ = client.Close()
+			return nil, err
+		}
 	}
 	go withRecover(client.backgroundMetadataUpdater)
 
@@ -605,7 +607,17 @@ func (client *client) backgroundMetadataUpdater() {
 	for {
 		select {
 		case <-ticker.C:
-			if err := client.RefreshMetadata(); err != nil {
+			topics := []string{}
+			if !client.conf.Metadata.Full {
+				if specificTopics, err := client.Topics(); err != nil {
+					Logger.Println("Client background metadata topic load:", err)
+					break
+				} else {
+					topics = specificTopics
+				}
+			}
+
+			if err := client.RefreshMetadata(topics...); err != nil {
 				Logger.Println("Client background metadata update:", err)
 			}
 		case <-client.closer:

+ 7 - 0
config.go

@@ -72,6 +72,12 @@ type Config struct {
 		// Defaults to 10 minutes. Set to 0 to disable. Similar to
 		// `topic.metadata.refresh.interval.ms` in the JVM version.
 		RefreshFrequency time.Duration
+
+		// Whether to maintain a full set of metadata for all topics, or just
+		// the minimal set that has been necessary so far. The full set is simpler
+		// and usually more convenient, but can take up a substantial amount of
+		// memory if you have many topics and partitions. Defaults to true.
+		Full bool
 	}
 
 	// Producer is the namespace for configuration related to producing messages,
@@ -263,6 +269,7 @@ func NewConfig() *Config {
 	c.Metadata.Retry.Max = 3
 	c.Metadata.Retry.Backoff = 250 * time.Millisecond
 	c.Metadata.RefreshFrequency = 10 * time.Minute
+	c.Metadata.Full = true
 
 	c.Producer.MaxMessageBytes = 1000000
 	c.Producer.RequiredAcks = WaitForLocal