Browse Source

Merge pull request #376 from Shopify/retry-over-error

client: given both retries and errors, prefer retries
Evan Huus 10 years ago
parent
commit
6743438aae
4 changed files with 99 additions and 9 deletions
  1. 2 6
      client.go
  2. 89 0
      client_test.go
  3. 3 3
      config.go
  4. 5 0
      functional_test.go

+ 2 - 6
client.go

@@ -497,7 +497,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
 			if len(retry) > 0 {
 				if retriesRemaining <= 0 {
 					Logger.Println("Some partitions are leaderless, but we're out of retries")
-					return nil
+					return err
 				}
 				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
 					client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
@@ -591,13 +591,9 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 		client.cachedPartitionsResults[topic.Name] = partitionCache
 	}
 
-	if err != nil {
-		return nil, err
-	}
-
 	ret := make([]string, 0, len(toRetry))
 	for topic := range toRetry {
 		ret = append(ret, topic)
 	}
-	return ret, nil
+	return ret, err
 }

+ 89 - 0
client_test.go

@@ -199,6 +199,95 @@ func TestClientMetadata(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientReceivingUnknownTopic(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+
+	metadataResponse1 := new(MetadataResponse)
+	seedBroker.Returns(metadataResponse1)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 1
+	config.Metadata.Retry.Backoff = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	metadataUnknownTopic := new(MetadataResponse)
+	metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
+	seedBroker.Returns(metadataUnknownTopic)
+	seedBroker.Returns(metadataUnknownTopic)
+
+	if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
+		t.Error("ErrUnknownTopicOrPartition expected, got", err)
+	}
+
+	// If we are asking for the leader of a partition of the non-existing topic.
+	// we will request metadata again.
+	seedBroker.Returns(metadataUnknownTopic)
+	seedBroker.Returns(metadataUnknownTopic)
+
+	if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
+		t.Error("Expected ErrUnknownTopicOrPartition, got", err)
+	}
+
+	safeClose(t, client)
+	seedBroker.Close()
+}
+
+func TestClientReceivingPartialMetadata(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 5)
+
+	metadataResponse1 := new(MetadataResponse)
+	metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
+	seedBroker.Returns(metadataResponse1)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
+
+	metadataPartial := new(MetadataResponse)
+	metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
+	metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
+	metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
+	seedBroker.Returns(metadataPartial)
+
+	if err := client.RefreshMetadata("new_topic"); err != nil {
+		t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
+	}
+
+	// Even though the metadata was incomplete, we should be able to get the leader of a partition
+	// for which we did get a useful response, without doing additional requests.
+
+	partition0Leader, err := client.Leader("new_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if partition0Leader.Addr() != leader.Addr() {
+		t.Error("Unexpected leader returned", partition0Leader.Addr())
+	}
+
+	// If we are asking for the leader of a partition that didn't have a leader before,
+	// we will do another metadata request.
+
+	seedBroker.Returns(metadataPartial)
+
+	// Still no leader for the partition, so asking for it should return an error.
+	_, err = client.Leader("new_topic", 1)
+	if err != ErrLeaderNotAvailable {
+		t.Error("Expected ErrLeaderNotAvailable, got", err)
+	}
+
+	safeClose(t, client)
+	seedBroker.Close()
+	leader.Close()
+}
+
 func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 5)

+ 3 - 3
config.go

@@ -192,9 +192,9 @@ func (c *Config) Validate() error {
 	switch {
 	case c.Metadata.Retry.Max < 0:
 		return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
-	case c.Metadata.Retry.Backoff <= time.Duration(0):
-		return ConfigurationError("Invalid Metadata.Retry.Backoff, must be > 0")
-	case c.Metadata.RefreshFrequency < time.Duration(0):
+	case c.Metadata.Retry.Backoff < 0:
+		return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0")
+	case c.Metadata.RefreshFrequency < 0:
 		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
 	}
 

+ 5 - 0
functional_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"log"
 	"net"
 	"os"
 	"strings"
@@ -36,6 +37,10 @@ func init() {
 	}
 
 	kafkaShouldBeAvailable = os.Getenv("CI") != ""
+
+	if os.Getenv("DEBUG") == "true" {
+		Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
+	}
 }
 
 func checkKafkaAvailability(t *testing.T) {