Browse Source

Merge pull request #415 from Shopify/cleanup_refresh_metadata

Refactor metadata retry logic and improve logging
Willem van Bergen 10 years ago
parent
commit
7abda07388
2 changed files with 55 additions and 68 deletions
  1. 5 1
      broker.go
  2. 50 67
      client.go

+ 5 - 1
broker.go

@@ -85,7 +85,11 @@ func (b *Broker) Open(conf *Config) error {
 		b.done = make(chan bool)
 		b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
 
-		Logger.Printf("Connected to broker %s\n", b.addr)
+		if b.id >= 0 {
+			Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
+		} else {
+			Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
+		}
 		go withRecover(b.responseReceiver)
 	})
 

+ 50 - 67
client.go

@@ -369,11 +369,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
 func (client *client) registerBroker(broker *Broker) {
 	if client.brokers[broker.ID()] == nil {
 		client.brokers[broker.ID()] = broker
-		Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr())
+		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
 	} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 		safeAsyncClose(client.brokers[broker.ID()])
 		client.brokers[broker.ID()] = broker
-		Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
+		Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 	}
 }
 
@@ -391,7 +391,7 @@ func (client *client) deregisterBroker(broker *Broker) {
 		// but we really shouldn't have to; once that loop is made better this case can be
 		// removed, and the function generally can be renamed from `deregisterBroker` to
 		// `nextSeedBroker` or something
-		Logger.Printf("client/brokers Deregistered broker #%d at %s", broker.ID(), broker.Addr())
+		Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
 		delete(client.brokers, broker.ID())
 	}
 }
@@ -400,6 +400,7 @@ func (client *client) resurrectDeadBrokers() {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
+	Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
 	client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
 	client.deadSeeds = nil
 }
@@ -555,58 +556,52 @@ func (client *client) backgroundMetadataUpdater() {
 	}
 }
 
-func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) error {
+func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
+	retry := func(err error) error {
+		if attemptsRemaining > 0 {
+			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
+			time.Sleep(client.conf.Metadata.Retry.Backoff)
+			return client.tryRefreshMetadata(topics, attemptsRemaining-1)
+		}
+		return err
+	}
+
 	for broker := client.any(); broker != nil; broker = client.any() {
 		if len(topics) > 0 {
-			Logger.Printf("Fetching metadata for %v from broker %s\n", topics, broker.addr)
+			Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
 		} else {
-			Logger.Printf("Fetching metadata for all topics from broker %s\n", broker.addr)
+			Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
 		}
 		response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})
 
 		switch err.(type) {
 		case nil:
 			// valid response, use it
-			retry, err := client.updateMetadata(response)
-
-			if len(retry) > 0 {
-				if retriesRemaining <= 0 {
-					Logger.Println("Some partitions are leaderless, but we're out of retries")
-					return err
-				}
-				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
-					client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
-				time.Sleep(client.conf.Metadata.Retry.Backoff) // wait for leader election
-				return client.tryRefreshMetadata(retry, retriesRemaining-1)
+			if shouldRetry, err := client.updateMetadata(response); shouldRetry {
+				Logger.Println("client/metadata found some partitions to be leaderless")
+				return retry(err) // note: err can be nil
+			} else {
+				return err
 			}
 
-			return err
 		case PacketEncodingError:
 			// didn't even send, return the error
 			return err
 		default:
 			// some other error, remove that broker and try again
-			Logger.Println("Error from broker while fetching metadata:", err)
+			Logger.Println("client/metadata got error from broker while fetching metadata:", err)
 			_ = broker.Close()
 			client.deregisterBroker(broker)
 		}
 	}
 
-	Logger.Println("Out of available brokers.")
-
-	if retriesRemaining > 0 {
-		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n",
-			client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
-		time.Sleep(client.conf.Metadata.Retry.Backoff)
-		client.resurrectDeadBrokers()
-		return client.tryRefreshMetadata(topics, retriesRemaining-1)
-	}
-
-	return ErrOutOfBrokers
+	Logger.Println("client/metadata no available broker to send metadata request to")
+	client.resurrectDeadBrokers()
+	return retry(ErrOutOfBrokers)
 }
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
-func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
+func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
@@ -618,11 +613,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 		client.registerBroker(broker)
 	}
 
-	toRetry := make(map[string]none)
-
-	var err error
 	for _, topic := range data.Topics {
-
 		delete(client.metadata, topic.Name)
 		delete(client.cachedPartitionsResults, topic.Name)
 
@@ -634,10 +625,10 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 			continue
 		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
 			err = topic.Err
-			toRetry[topic.Name] = none{}
+			retry = true
 			continue
-		case ErrLeaderNotAvailable: // retry, but store partiial partition results
-			toRetry[topic.Name] = none{}
+		case ErrLeaderNotAvailable: // retry, but store partial partition results
+			retry = true
 			break
 		default: // don't retry, don't store partial results
 			Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
@@ -649,7 +640,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 		for _, partition := range topic.Partitions {
 			client.metadata[topic.Name][partition.ID] = partition
 			if partition.Err == ErrLeaderNotAvailable {
-				toRetry[topic.Name] = none{}
+				retry = true
 			}
 		}
 
@@ -659,11 +650,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 		client.cachedPartitionsResults[topic.Name] = partitionCache
 	}
 
-	ret := make([]string, 0, len(toRetry))
-	for topic := range toRetry {
-		ret = append(ret, topic)
-	}
-	return ret, err
+	return
 }
 
 func (client *client) cachedCoordinator(consumerGroup string) *Broker {
@@ -676,9 +663,18 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
 	}
 }
 
-func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) {
+func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) {
+	retry := func(err error) (*ConsumerMetadataResponse, error) {
+		if attemptsRemaining > 0 {
+			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
+			time.Sleep(client.conf.Metadata.Retry.Backoff)
+			return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
+		}
+		return nil, err
+	}
+
 	for broker := client.any(); broker != nil; broker = client.any() {
-		Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())
+		Logger.Printf("client/coordinator requesting coordinator for consumergoup %s from %s\n", consumerGroup, broker.Addr())
 
 		request := new(ConsumerMetadataRequest)
 		request.ConsumerGroup = consumerGroup
@@ -686,7 +682,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining
 		response, err := broker.GetConsumerMetadata(request)
 
 		if err != nil {
-			Logger.Printf("client/coordinator Request to broker %s failed: %s.\n", broker.Addr(), err)
+			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
 
 			switch err.(type) {
 			case PacketEncodingError:
@@ -700,40 +696,27 @@ func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining
 
 		switch response.Err {
 		case ErrNoError:
-			Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort)
+			Logger.Printf("client/coordinator coordinator for consumergoup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
 			return response, nil
 
 		case ErrConsumerCoordinatorNotAvailable:
-			Logger.Printf("client/coordinator Coordinator for consumer group %s is not available.\n", consumerGroup)
+			Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
 
 			// This is very ugly, but this scenario will only happen once per cluster.
 			// The __consumer_offsets topic only has to be created one time.
 			// The number of partitions not configurable, but partition 0 should always exist.
 			if _, err := client.Leader("__consumer_offsets", 0); err != nil {
-				Logger.Printf("client/coordinator The __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
+				Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
 				time.Sleep(2 * time.Second)
 			}
 
-			if retriesRemaining > 0 {
-				Logger.Printf("Retrying after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
-				time.Sleep(client.conf.Metadata.Retry.Backoff)
-				return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
-			}
-			return nil, ErrConsumerCoordinatorNotAvailable
-
+			return retry(ErrConsumerCoordinatorNotAvailable)
 		default:
 			return nil, response.Err
 		}
 	}
 
-	Logger.Println("Out of available brokers to request consumer metadata from.")
-
-	if retriesRemaining > 0 {
-		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
-		time.Sleep(client.conf.Metadata.Retry.Backoff)
-		client.resurrectDeadBrokers()
-		return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
-	}
-
-	return nil, ErrOutOfBrokers
+	Logger.Println("client/coordinator no available broker to send consumer metadata request to")
+	client.resurrectDeadBrokers()
+	return retry(ErrOutOfBrokers)
 }