Browse Source

Merge pull request #347 from Shopify/consolidate_refresh_metadata

Consilidate refresh metadata methods into one
Willem van Bergen 10 years ago
parent
commit
0b4071f3ba
3 changed files with 17 additions and 24 deletions
  1. 15 22
      client.go
  2. 1 1
      consumer.go
  3. 1 1
      producer.go

+ 15 - 22
client.go

@@ -37,12 +37,10 @@ type Client interface {
 	// This method should be considered effectively deprecated.
 	ReplicasInSync(topic string, partitionID int32) ([]int32, error)
 
-	// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
-	// available metadata for those topics.
-	RefreshTopicMetadata(topics ...string) error
-
-	// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
-	RefreshAllMetadata() error
+	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
+	// available metadata for those topics. If no topics are provided, it will refresh metadata
+	// for all topics.
+	RefreshMetadata(topics ...string) error
 
 	// GetOffset queries the cluster to get the most recent available offset at the given
 	// time on the topic/partition combination.
@@ -108,7 +106,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 	_ = client.seedBroker.Open(conf)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err := client.RefreshAllMetadata()
+	err := client.RefreshMetadata()
 	switch err {
 	case nil:
 		break
@@ -188,7 +186,7 @@ func (client *client) Partitions(topic string) ([]int32, error) {
 	partitions := client.cachedPartitions(topic, allPartitions)
 
 	if len(partitions) == 0 {
-		err := client.RefreshTopicMetadata(topic)
+		err := client.RefreshMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
@@ -217,7 +215,7 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) {
 	// a metadata refresh as a nicety so callers can just try again and don't have to manually
 	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
 	if len(partitions) == 0 {
-		err := client.RefreshTopicMetadata(topic)
+		err := client.RefreshMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
@@ -269,7 +267,7 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
 	leader, err := client.cachedLeader(topic, partitionID)
 
 	if leader == nil {
-		err := client.RefreshTopicMetadata(topic)
+		err := client.RefreshMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
@@ -279,13 +277,8 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
 	return leader, err
 }
 
-func (client *client) RefreshTopicMetadata(topics ...string) error {
-	return client.refreshMetadata(topics, client.conf.Metadata.Retry.Max)
-}
-
-func (client *client) RefreshAllMetadata() error {
-	// Kafka refreshes all when you encode it an empty array...
-	return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retry.Max)
+func (client *client) RefreshMetadata(topics ...string) error {
+	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
 }
 
 func (client *client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
@@ -398,7 +391,7 @@ func (client *client) getMetadata(topic string, partitionID int32) (*PartitionMe
 	metadata := client.cachedMetadata(topic, partitionID)
 
 	if metadata == nil {
-		err := client.RefreshTopicMetadata(topic)
+		err := client.RefreshMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
@@ -489,7 +482,7 @@ func (client *client) backgroundMetadataUpdater() {
 	for {
 		select {
 		case <-ticker.C:
-			if err := client.RefreshAllMetadata(); err != nil {
+			if err := client.RefreshMetadata(); err != nil {
 				Logger.Println("Client background metadata update:", err)
 			}
 		case <-client.closer:
@@ -499,7 +492,7 @@ func (client *client) backgroundMetadataUpdater() {
 	}
 }
 
-func (client *client) refreshMetadata(topics []string, retriesRemaining int) error {
+func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) error {
 	// This function is a sort of central point for most functions that create new
 	// resources.  Check to see if we're dealing with a closed Client and error
 	// out immediately if so.
@@ -537,7 +530,7 @@ func (client *client) refreshMetadata(topics []string, retriesRemaining int) err
 				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
 					client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
 				time.Sleep(client.conf.Metadata.Retry.Backoff) // wait for leader election
-				return client.refreshMetadata(retry, retriesRemaining-1)
+				return client.tryRefreshMetadata(retry, retriesRemaining-1)
 			}
 
 			return err
@@ -558,7 +551,7 @@ func (client *client) refreshMetadata(topics []string, retriesRemaining int) err
 			client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
 		time.Sleep(client.conf.Metadata.Retry.Backoff)
 		client.resurrectDeadBrokers()
-		return client.refreshMetadata(topics, retriesRemaining-1)
+		return client.tryRefreshMetadata(topics, retriesRemaining-1)
 	}
 
 	return ErrOutOfBrokers

+ 1 - 1
consumer.go

@@ -296,7 +296,7 @@ func (child *partitionConsumer) dispatcher() {
 }
 
 func (child *partitionConsumer) dispatch() error {
-	if err := child.consumer.client.RefreshTopicMetadata(child.topic); err != nil {
+	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
 		return err
 	}
 

+ 1 - 1
producer.go

@@ -312,7 +312,7 @@ func (p *producer) leaderDispatcher(topic string, partition int32, input chan *P
 
 	breaker := breaker.New(3, 1, 10*time.Second)
 	doUpdate := func() (err error) {
-		if err = p.client.RefreshTopicMetadata(topic); err != nil {
+		if err = p.client.RefreshMetadata(topic); err != nil {
 			return err
 		}