Browse Source

Add Config.Metadata.Retry.BackoffFunc

Tom Lee 7 năm trước cách đây
mục cha
commit
a047cd70e0
5 tập tin đã thay đổi với 90 bổ sung6 xóa
  1. 17 3
      client.go
  2. 38 0
      client_test.go
  3. 4 0
      config.go
  4. 10 1
      offset_manager.go
  5. 21 2
      offset_manager_test.go

+ 17 - 3
client.go

@@ -687,8 +687,11 @@ func (client *client) refreshMetadata() error {
 func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
 	retry := func(err error) error {
 		if attemptsRemaining > 0 {
+			backoff := client.computeBackoff(attemptsRemaining)
 			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
-			time.Sleep(client.conf.Metadata.Retry.Backoff)
+			if backoff > 0 {
+				time.Sleep(backoff)
+			}
 			return client.tryRefreshMetadata(topics, attemptsRemaining-1)
 		}
 		return err
@@ -816,11 +819,22 @@ func (client *client) cachedController() *Broker {
 	return client.brokers[client.controllerID]
 }
 
+func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
+	if client.conf.Metadata.Retry.BackoffFunc != nil {
+		maxRetries := client.conf.Metadata.Retry.Max
+		retries := maxRetries - attemptsRemaining
+		return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
+	} else {
+		return client.conf.Metadata.Retry.Backoff
+	}
+}
+
 func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
 	retry := func(err error) (*FindCoordinatorResponse, error) {
 		if attemptsRemaining > 0 {
-			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
-			time.Sleep(client.conf.Metadata.Retry.Backoff)
+			backoff := client.computeBackoff(attemptsRemaining)
+			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
+			time.Sleep(backoff)
 			return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
 		}
 		return nil, err

+ 38 - 0
client_test.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"io"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -260,6 +261,43 @@ func TestClientGetOffset(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+
+	metadataResponse1 := new(MetadataResponse)
+	seedBroker.Returns(metadataResponse1)
+
+	retryCount := int32(0)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 1
+	config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
+		atomic.AddInt32(&retryCount, 1)
+		return 0
+	}
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	metadataUnknownTopic := new(MetadataResponse)
+	metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
+	seedBroker.Returns(metadataUnknownTopic)
+	seedBroker.Returns(metadataUnknownTopic)
+
+	if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
+		t.Error("ErrUnknownTopicOrPartition expected, got", err)
+	}
+
+	safeClose(t, client)
+	seedBroker.Close()
+
+	actualRetryCount := atomic.LoadInt32(&retryCount)
+	if actualRetryCount != 1 {
+		t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
+	}
+}
+
 func TestClientReceivingUnknownTopic(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 

+ 4 - 0
config.go

@@ -84,6 +84,10 @@ type Config struct {
 			// How long to wait for leader election to occur before retrying
 			// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
 			Backoff time.Duration
+			// Called to compute backoff time dynamically. Useful for implementing
+			// more sophisticated backoff strategies. This takes precedence over
+			// `Backoff` if set.
+			BackoffFunc func(retries, maxRetries int) time.Duration
 		}
 		// How frequently to refresh the cluster metadata in the background.
 		// Defaults to 10 minutes. Set to 0 to disable. Similar to

+ 10 - 1
offset_manager.go

@@ -110,6 +110,14 @@ func (om *offsetManager) Close() error {
 	return nil
 }
 
+func (om *offsetManager) computeBackoff(retries int) time.Duration {
+	if om.conf.Metadata.Retry.BackoffFunc != nil {
+		return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
+	} else {
+		return om.conf.Metadata.Retry.Backoff
+	}
+}
+
 func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
 	broker, err := om.coordinator()
 	if err != nil {
@@ -151,10 +159,11 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 		if retries <= 0 {
 			return 0, "", block.Err
 		}
+		backoff := om.computeBackoff(retries)
 		select {
 		case <-om.closing:
 			return 0, "", block.Err
-		case <-time.After(om.conf.Metadata.Retry.Backoff):
+		case <-time.After(backoff):
 		}
 		return om.fetchInitialOffset(topic, partition, retries-1)
 	default:

+ 21 - 2
offset_manager_test.go

@@ -1,15 +1,20 @@
 package sarama
 
 import (
+	"sync/atomic"
 	"testing"
 	"time"
 )
 
-func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
+func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
+	backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
 
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
+	if backoffFunc != nil {
+		config.Metadata.Retry.BackoffFunc = backoffFunc
+	}
 	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
 	config.Version = V0_9_0_0
 	if retention > 0 {
@@ -45,6 +50,11 @@ func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
 	return om, testClient, broker, coordinator
 }
 
+func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
+	testClient Client, broker, coordinator *MockBroker) {
+	return initOffsetManagerWithBackoffFunc(t, retention, nil)
+}
+
 func initPartitionOffsetManager(t *testing.T, om OffsetManager,
 	coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
 
@@ -133,7 +143,12 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
 
 // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
 func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
-	om, testClient, broker, coordinator := initOffsetManager(t, 0)
+	retryCount := int32(0)
+	backoff := func(retries, maxRetries int) time.Duration {
+		atomic.AddInt32(&retryCount, 1)
+		return 0
+	}
+	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
 
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
@@ -163,6 +178,10 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 	safeClose(t, pom)
 	safeClose(t, om)
 	safeClose(t, testClient)
+
+	if atomic.LoadInt32(&retryCount) == 0 {
+		t.Fatal("Expected at least one retry")
+	}
 }
 
 func TestPartitionOffsetManagerInitialOffset(t *testing.T) {