浏览代码

Do not use partition cache for unknown topics.

Willem van Bergen 10 年之前
父节点
当前提交
7f2cdf6cd2
共有 6 个文件被更改,包括 139 次插入13 次删除
  1. 3 0
      CHANGELOG.md
  2. 19 5
      client.go
  3. 54 0
      client_test.go
  4. 19 0
      functional_test.go
  5. 14 8
      metadata_response.go
  6. 30 0
      sync_producer_test.go

+ 3 - 0
CHANGELOG.md

@@ -9,6 +9,9 @@ Bug Fixes:
    ([#369](https://github.com/Shopify/sarama/pull/369)).
  - Fix a condition where the producer's internal control messages could have
    gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
+ - Fix an issue where invalid partition lists would be cached when asking for
+   metadata for a non-existing topic ([#372](https://github.com/Shopify/sarama/pull/372)).
+
 
 #### Version 1.0.0 (2015-03-17)
 

+ 19 - 5
client.go

@@ -492,7 +492,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
 		switch err.(type) {
 		case nil:
 			// valid response, use it
-			retry, err := client.update(response)
+			retry, err := client.updateMetadata(response)
 
 			if len(retry) > 0 {
 				if retriesRemaining <= 0 {
@@ -531,7 +531,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
 }
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
-func (client *client) update(data *MetadataResponse) ([]string, error) {
+func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
@@ -554,23 +554,37 @@ func (client *client) update(data *MetadataResponse) ([]string, error) {
 
 	var err error
 	for _, topic := range data.Topics {
+
+		delete(client.metadata, topic.Name)
+		delete(client.cachedPartitionsResults, topic.Name)
+
 		switch topic.Err {
 		case ErrNoError:
 			break
-		case ErrLeaderNotAvailable, ErrUnknownTopicOrPartition:
+		case ErrInvalidTopic: // don't retry, don't store partial results
+			err = topic.Err
+			continue
+		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
+			err = topic.Err
 			toRetry[topic.Name] = true
-		default:
+			continue
+		case ErrLeaderNotAvailable: // retry, but store partiial partition results
+			toRetry[topic.Name] = true
+			break
+		default: // don't retry, don't store partial results
+			Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
 			err = topic.Err
+			continue
 		}
 
 		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
-		delete(client.cachedPartitionsResults, topic.Name)
 		for _, partition := range topic.Partitions {
 			client.metadata[topic.Name][partition.ID] = partition
 			if partition.Err == ErrLeaderNotAvailable {
 				toRetry[topic.Name] = true
 			}
 		}
+
 		var partitionCache [maxPartitionIndex][]int32
 		partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
 		partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)

+ 54 - 0
client_test.go

@@ -65,6 +65,60 @@ func TestCachedPartitions(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+
+	replicas := []int32{seedBroker.BrokerID()}
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
+	seedBroker.Returns(metadataResponse)
+
+	partitions, err := client.Partitions("unknown")
+
+	if err != ErrUnknownTopicOrPartition {
+		t.Error("Expected ErrUnknownTopicOrPartition, found", err)
+	}
+	if partitions != nil {
+		t.Errorf("Should return nil as partition list, found %v", partitions)
+	}
+
+	// Should still use the cache of a known topic
+	partitions, err = client.Partitions("my_topic")
+	if err != nil {
+		t.Errorf("Expected no error, found %v", err)
+	}
+
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
+	seedBroker.Returns(metadataResponse)
+
+	// Should not use cache for unknown topic
+	partitions, err = client.Partitions("unknown")
+	if err != ErrUnknownTopicOrPartition {
+		t.Error("Expected ErrUnknownTopicOrPartition, found", err)
+	}
+	if partitions != nil {
+		t.Errorf("Should return nil as partition list, found %v", partitions)
+	}
+
+	seedBroker.Close()
+	safeClose(t, client)
+}
+
 func TestClientSeedBrokers(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 

+ 19 - 0
functional_test.go

@@ -124,6 +124,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	}
 }
 
+func TestProducingToInvalidTopic(t *testing.T) {
+	checkKafkaAvailability(t)
+
+	producer, err := NewSyncProducer(kafkaBrokers, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
+		t.Log("Expected ErrInvalidTopic, found", err)
+	}
+
+	if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrInvalidTopic {
+		t.Log("Expected ErrInvalidTopic, found", err)
+	}
+
+	safeClose(t, producer)
+}
+
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 

+ 14 - 8
metadata_response.go

@@ -182,25 +182,31 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 }
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
-	var match *TopicMetadata
+func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
+	var tmatch *TopicMetadata
 
 	for _, tm := range m.Topics {
 		if tm.Name == topic {
-			match = tm
+			tmatch = tm
 			goto foundTopic
 		}
 	}
 
-	match = new(TopicMetadata)
-	match.Name = topic
-	m.Topics = append(m.Topics, match)
+	tmatch = new(TopicMetadata)
+	tmatch.Name = topic
+	m.Topics = append(m.Topics, tmatch)
 
 foundTopic:
 
+	tmatch.Err = err
+	return tmatch
+}
+
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
+	tmatch := m.AddTopic(topic, ErrNoError)
 	var pmatch *PartitionMetadata
 
-	for _, pm := range match.Partitions {
+	for _, pm := range tmatch.Partitions {
 		if pm.ID == partition {
 			pmatch = pm
 			goto foundPartition
@@ -209,7 +215,7 @@ foundTopic:
 
 	pmatch = new(PartitionMetadata)
 	pmatch.ID = partition
-	match.Partitions = append(match.Partitions, pmatch)
+	tmatch.Partitions = append(tmatch.Partitions, pmatch)
 
 foundPartition:
 

+ 30 - 0
sync_producer_test.go

@@ -97,6 +97,36 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	seedBroker.Close()
 }
 
+func TestSyncProducerToNonExistingTopic(t *testing.T) {
+	broker := newMockBroker(t, 1)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	config.Producer.Retry.Max = 0
+
+	producer, err := NewSyncProducer([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
+	broker.Returns(metadataResponse)
+
+	_, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
+	if err != ErrUnknownTopicOrPartition {
+		t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
+	}
+
+	safeClose(t, producer)
+	broker.Close()
+}
+
 // This example shows the basic usage pattern of the SyncProducer.
 func ExampleSyncProducer() {
 	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)