Browse Source

Rework how the client handles metadata errors

Errors at the partition-level of metadata should not, generally, affect the
update process itself. For the majority of such errors, Kafka provides what data
it can (i.e. when LeaderNotAvailable, it still gives the list of partitions,
isr, etc) and I can't think of a case where the stale data from the last
"complete" update is somehow more useful than the up-to-date partial data.
Especially when (for example) asking for the Leader, if ReplicaNotAvailable we
don't really care - we still have the leader id.

Based on this logic:
- Partition-level errors during a metadata update are no longer returned from
  the update call itself. The update is considered to succeed as long as all
  topics were successfully handled (mid-election partitions are still retried
  after a brief delay).
- Partition-level errors are instead returned from actual requests for the
  relevant data (e.g. if a partition got LeaderNotAvailable, that is still
  returned from a call to Leader(), but only for that partition, and a call to
  Replicas() would still return the actual replicas with no error).

Side change: get rid of NoSuchTopic, kafka already has UnknownTopicOrPartition
Evan Huus 11 years ago
parent
commit
1b465e78c2
2 changed files with 22 additions and 28 deletions
  1. 22 25
      client.go
  2. 0 3
      errors.go

+ 22 - 25
client.go

@@ -127,7 +127,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
 	// this function returning an empty slice (which is a valid return value) but catching it
-	// here the first time (note we *don't* catch it below where we return NoSuchTopic) triggers
+	// here the first time (note we *don't* catch it below where we return UnknownTopicOrPartition) triggers
 	// a metadata refresh as a nicety so callers can just try again and don't have to manually
 	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
 	if len(partitions) == 0 {
@@ -139,7 +139,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, NoSuchTopic
+		return nil, UnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -188,6 +188,9 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 		return nil, err
 	}
 
+	if metadata.Err == ReplicaNotAvailable {
+		return nil, metadata.Err
+	}
 	return dupeAndSort(metadata.Replicas), nil
 }
 
@@ -198,27 +201,26 @@ func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32,
 		return nil, err
 	}
 
+	if metadata.Err == ReplicaNotAvailable {
+		return nil, metadata.Err
+	}
 	return dupeAndSort(metadata.Isr), nil
 }
 
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // determined by querying the cluster metadata.
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
-	leader := client.cachedLeader(topic, partitionID)
+	leader, err := client.cachedLeader(topic, partitionID)
 
 	if leader == nil {
 		err := client.RefreshTopicMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
-		leader = client.cachedLeader(topic, partitionID)
+		leader, err = client.cachedLeader(topic, partitionID)
 	}
 
-	if leader == nil {
-		return nil, UnknownTopicOrPartition
-	}
-
-	return leader, nil
+	return leader, err
 }
 
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
@@ -310,7 +312,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 		if len(topic) == 0 {
-			return NoSuchTopic
+			return UnknownTopicOrPartition
 		}
 	}
 
@@ -386,7 +388,7 @@ func (client *Client) any() *Broker {
 	return client.seedBroker
 }
 
-func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
+func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
@@ -394,11 +396,14 @@ func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
 	if partitions != nil {
 		metadata, ok := partitions[partitionID]
 		if ok {
-			return client.brokers[metadata.Leader]
+			if metadata.Err == LeaderNotAvailable {
+				return nil, metadata.Err
+			}
+			return client.brokers[metadata.Leader], nil
 		}
 	}
 
-	return nil
+	return nil, UnknownTopicOrPartition
 }
 
 func (client *Client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
@@ -479,32 +484,24 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 
 	var err error
 	for _, topic := range data.Topics {
-		switch topic.Err {
-		case NoError:
-			break
-		case LeaderNotAvailable:
-			toRetry[topic.Name] = true
-		default:
+		if topic.Err != NoError {
 			err = topic.Err
+			continue
 		}
+
 		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
+			client.metadata[topic.Name][partition.ID] = partition
 			switch partition.Err {
 			case NoError:
 				broker := client.brokers[partition.Leader]
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 					if connected, _ := broker.Connected(); !connected {
 						toRetry[topic.Name] = true
-						delete(client.metadata[topic.Name], partition.ID)
-						continue
 					}
 				}
-				client.metadata[topic.Name][partition.ID] = partition
 			case LeaderNotAvailable:
 				toRetry[topic.Name] = true
-				delete(client.metadata[topic.Name], partition.ID)
-			default:
-				err = partition.Err
 			}
 		}
 	}

+ 0 - 3
errors.go

@@ -12,9 +12,6 @@ var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to
 // ClosedClient is the error returned when a method is called on a client that has been closed.
 var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")
 
-// NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
-var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
-
 // IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
 // not contain the expected information.
 var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")