Преглед на файлове

Add Config.Consumer.Retry.BackoffFunc

Tom Lee преди 7 години
родител
ревизия
d6f3bc3c43
променени са 3 файла, в които са добавени 54 реда и са изтрити 9 реда
  1. 4 0
      config.go
  2. 16 1
      consumer.go
  3. 34 8
      consumer_test.go

+ 4 - 0
config.go

@@ -189,6 +189,10 @@ type Config struct {
 			// How long to wait after a failing to read from a partition before
 			// trying again (default 2s).
 			Backoff time.Duration
+			// Called to compute backoff time dynamically. Useful for implementing
+			// more sophisticated backoff strategies. This takes precedence over
+			// `Backoff` if set.
+			BackoffFunc func(retries int) time.Duration
 		}
 
 		// Fetch is the namespace for controlling how many bytes are retrieved by any

+ 16 - 1
consumer.go

@@ -314,6 +314,8 @@ type partitionConsumer struct {
 
 	fetchSize int32
 	offset    int64
+
+	retries int32
 }
 
 var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
@@ -332,12 +334,21 @@ func (child *partitionConsumer) sendError(err error) {
 	}
 }
 
+func (child *partitionConsumer) computeBackoff() time.Duration {
+	if child.conf.Consumer.Retry.BackoffFunc != nil {
+		retries := atomic.AddInt32(&child.retries, 1)
+		return child.conf.Consumer.Retry.BackoffFunc(int(retries))
+	} else {
+		return child.conf.Consumer.Retry.Backoff
+	}
+}
+
 func (child *partitionConsumer) dispatcher() {
 	for range child.trigger {
 		select {
 		case <-child.dying:
 			close(child.trigger)
-		case <-time.After(child.conf.Consumer.Retry.Backoff):
+		case <-time.After(child.computeBackoff()):
 			if child.broker != nil {
 				child.consumer.unrefBrokerConsumer(child.broker)
 				child.broker = nil
@@ -451,6 +462,10 @@ feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
+		if child.responseResult == nil {
+			atomic.StoreInt32(&child.retries, 0)
+		}
+
 		for i, msg := range msgs {
 		messageSelect:
 			select {

+ 34 - 8
consumer_test.go

@@ -5,6 +5,7 @@ import (
 	"os"
 	"os/signal"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -180,9 +181,7 @@ func TestConsumerDuplicate(t *testing.T) {
 	broker0.Close()
 }
 
-// If consumer fails to refresh metadata it keeps retrying with frequency
-// specified by `Config.Consumer.Retry.Backoff`.
-func TestConsumerLeaderRefreshError(t *testing.T) {
+func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
 	// Given
 	broker0 := NewMockBroker(t, 100)
 
@@ -200,11 +199,6 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
 			SetMessage("my_topic", 0, 123, testMsg),
 	})
 
-	config := NewConfig()
-	config.Net.ReadTimeout = 100 * time.Millisecond
-	config.Consumer.Retry.Backoff = 200 * time.Millisecond
-	config.Consumer.Return.Errors = true
-	config.Metadata.Retry.Max = 0
 	c, err := NewConsumer([]string{broker0.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -258,6 +252,38 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
 	broker0.Close()
 }
 
+// If consumer fails to refresh metadata it keeps retrying with frequency
+// specified by `Config.Consumer.Retry.Backoff`.
+func TestConsumerLeaderRefreshError(t *testing.T) {
+	config := NewConfig()
+	config.Net.ReadTimeout = 100 * time.Millisecond
+	config.Consumer.Retry.Backoff = 200 * time.Millisecond
+	config.Consumer.Return.Errors = true
+	config.Metadata.Retry.Max = 0
+
+	runConsumerLeaderRefreshErrorTestWithConfig(t, config)
+}
+
+func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
+	var calls int32 = 0
+
+	config := NewConfig()
+	config.Net.ReadTimeout = 100 * time.Millisecond
+	config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
+		atomic.AddInt32(&calls, 1)
+		return 200 * time.Millisecond
+	}
+	config.Consumer.Return.Errors = true
+	config.Metadata.Retry.Max = 0
+
+	runConsumerLeaderRefreshErrorTestWithConfig(t, config)
+
+	// we expect at least one call to our backoff function
+	if calls == 0 {
+		t.Fail()
+	}
+}
+
 func TestConsumerInvalidTopic(t *testing.T) {
 	// Given
 	broker0 := NewMockBroker(t, 100)