瀏覽代碼

Merge pull request #1359 from slaunay/enhancement/metadata-refresh-fail-fast

Support timeout when fetching metadata
Vlad Gorodetsky 6 年之前
父節點
當前提交
a5ecebcaa3
共有 3 個文件被更改,包括 102 次插入4 次删除
  1. 25 4
      client.go
  2. 70 0
      client_test.go
  3. 7 0
      config.go

+ 25 - 4
client.go

@@ -435,7 +435,11 @@ func (client *client) RefreshMetadata(topics ...string) error {
 		}
 		}
 	}
 	}
 
 
-	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
+	deadline := time.Time{}
+	if client.conf.Metadata.Timeout > 0 {
+		deadline = time.Now().Add(client.conf.Metadata.Timeout)
+	}
+	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
 }
 }
 
 
 func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
 func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
@@ -737,20 +741,32 @@ func (client *client) refreshMetadata() error {
 	return nil
 	return nil
 }
 }
 
 
-func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
+func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
+	pastDeadline := func(backoff time.Duration) bool {
+		if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
+			// we are past the deadline
+			return true
+		}
+		return false
+	}
 	retry := func(err error) error {
 	retry := func(err error) error {
 		if attemptsRemaining > 0 {
 		if attemptsRemaining > 0 {
 			backoff := client.computeBackoff(attemptsRemaining)
 			backoff := client.computeBackoff(attemptsRemaining)
+			if pastDeadline(backoff) {
+				Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
+				return err
+			}
 			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
 			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
 			if backoff > 0 {
 			if backoff > 0 {
 				time.Sleep(backoff)
 				time.Sleep(backoff)
 			}
 			}
-			return client.tryRefreshMetadata(topics, attemptsRemaining-1)
+			return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
 		}
 		}
 		return err
 		return err
 	}
 	}
 
 
-	for broker := client.any(); broker != nil; broker = client.any() {
+	broker := client.any()
+	for ; broker != nil && !pastDeadline(0); broker = client.any() {
 		allowAutoTopicCreation := true
 		allowAutoTopicCreation := true
 		if len(topics) > 0 {
 		if len(topics) > 0 {
 			Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
 			Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
@@ -800,6 +816,11 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
 		}
 		}
 	}
 	}
 
 
+	if broker != nil {
+		Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+		return retry(ErrOutOfBrokers)
+	}
+
 	Logger.Println("client/metadata no available broker to send metadata request to")
 	Logger.Println("client/metadata no available broker to send metadata request to")
 	client.resurrectDeadBrokers()
 	client.resurrectDeadBrokers()
 	return retry(ErrOutOfBrokers)
 	return retry(ErrOutOfBrokers)

+ 70 - 0
client_test.go

@@ -1,6 +1,7 @@
 package sarama
 package sarama
 
 
 import (
 import (
+	"fmt"
 	"io"
 	"io"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
@@ -612,6 +613,75 @@ func TestClientController(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestClientMetadataTimeout(t *testing.T) {
+	for _, timeout := range []time.Duration{
+		250 * time.Millisecond, // Will cut the first retry pass
+		500 * time.Millisecond, // Will cut the second retry pass
+		750 * time.Millisecond, // Will cut the third retry pass
+		900 * time.Millisecond, // Will stop after the three retries
+	} {
+		t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
+			// Use a responsive broker to create a working client
+			initialSeed := NewMockBroker(t, 0)
+			emptyMetadata := new(MetadataResponse)
+			initialSeed.Returns(emptyMetadata)
+
+			conf := NewConfig()
+			// Speed up the metadata request failure because of a read timeout
+			conf.Net.ReadTimeout = 100 * time.Millisecond
+			// Disable backoff and refresh
+			conf.Metadata.Retry.Backoff = 0
+			conf.Metadata.RefreshFrequency = 0
+			// But configure a "global" timeout
+			conf.Metadata.Timeout = timeout
+			c, err := NewClient([]string{initialSeed.Addr()}, conf)
+			if err != nil {
+				t.Fatal(err)
+			}
+			initialSeed.Close()
+
+			client := c.(*client)
+
+			// Start seed brokers that do not reply to anything and therefore a read
+			// on the TCP connection will timeout to simulate unresponsive brokers
+			seed1 := NewMockBroker(t, 1)
+			defer seed1.Close()
+			seed2 := NewMockBroker(t, 2)
+			defer seed2.Close()
+
+			// Overwrite the seed brokers with a fixed ordering to make this test deterministic
+			safeClose(t, client.seedBrokers[0])
+			client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
+			client.deadSeeds = []*Broker{}
+
+			// Start refreshing metadata in the background
+			errChan := make(chan error)
+			start := time.Now()
+			go func() {
+				errChan <- c.RefreshMetadata()
+			}()
+
+			// Check that the refresh fails fast enough (less than twice the configured timeout)
+			// instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
+			maxRefreshDuration := 2 * timeout
+			select {
+			case err := <-errChan:
+				t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
+				if err == nil {
+					t.Fatal("Expected failed RefreshMetadata, got nil")
+				}
+				if err != ErrOutOfBrokers {
+					t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
+				}
+			case <-time.After(maxRefreshDuration):
+				t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
+			}
+
+			safeClose(t, c)
+		})
+	}
+}
+
 func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
 func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker := NewMockBroker(t, 1)
 	staleCoordinator := NewMockBroker(t, 2)
 	staleCoordinator := NewMockBroker(t, 2)

+ 7 - 0
config.go

@@ -121,6 +121,13 @@ type Config struct {
 		// and usually more convenient, but can take up a substantial amount of
 		// and usually more convenient, but can take up a substantial amount of
 		// memory if you have many topics and partitions. Defaults to true.
 		// memory if you have many topics and partitions. Defaults to true.
 		Full bool
 		Full bool
+
+		// How long to wait for a successful metadata response.
+		// Disabled by default which means a metadata request against an unreachable
+		// cluster (all brokers are unreachable or unresponsive) can take up to
+		// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
+		// to fail.
+		Timeout time.Duration
 	}
 	}
 
 
 	// Producer is the namespace for configuration related to producing messages,
 	// Producer is the namespace for configuration related to producing messages,