Procházet zdrojové kódy

Add Config.Producer.Retry.BackoffFunc

Tom Lee před 7 roky
rodič
revize
7d903c1bb8
3 změnil soubory, kde provedl 87 přidání a 2 odebrání
  1. 15 2
      async_producer.go
  2. 68 0
      async_producer_test.go
  3. 4 0
      config.go

+ 15 - 2
async_producer.go

@@ -426,6 +426,19 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
 	return input
 }
 
+func (pp *partitionProducer) backoff(retries int) {
+	var backoff time.Duration
+	if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
+		maxRetries := pp.parent.conf.Producer.Retry.Max
+		backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
+	} else {
+		backoff = pp.parent.conf.Producer.Retry.Backoff
+	}
+	if backoff > 0 {
+		time.Sleep(backoff)
+	}
+}
+
 func (pp *partitionProducer) dispatch() {
 	// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
 	// on the first message
@@ -440,7 +453,7 @@ func (pp *partitionProducer) dispatch() {
 		if msg.retries > pp.highWatermark {
 			// a new, higher, retry level; handle it and then back off
 			pp.newHighWatermark(msg.retries)
-			time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			pp.backoff(msg.retries)
 		} else if pp.highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < pp.highWatermark {
@@ -468,7 +481,7 @@ func (pp *partitionProducer) dispatch() {
 		if pp.output == nil {
 			if err := pp.updateLeader(); err != nil {
 				pp.parent.returnError(msg, err)
-				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+				pp.backoff(msg.retries)
 				continue
 			}
 			Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())

+ 68 - 0
async_producer_test.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"os/signal"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -484,6 +485,73 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
+
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataLeader1)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 1
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 4
+
+	backoffCalled := make([]int32, config.Producer.Retry.Max+1)
+	config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
+		atomic.AddInt32(&backoffCalled[retries-1], 1)
+		return 0
+	}
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+
+	leader1.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader2)
+	leader2.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader1)
+	leader1.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader1)
+	leader1.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader2)
+	leader2.Returns(prodSuccess)
+
+	expectResults(t, producer, 1, 0)
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	leader2.Returns(prodSuccess)
+	expectResults(t, producer, 1, 0)
+
+	seedBroker.Close()
+	leader1.Close()
+	leader2.Close()
+	closeProducer(t, producer)
+
+	for i := 0; i < config.Producer.Retry.Max; i++ {
+		if atomic.LoadInt32(&backoffCalled[i]) != 1 {
+			t.Errorf("expected one retry attempt #%d", i)
+		}
+	}
+	if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
+		t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
+	}
+}
+
 func TestAsyncProducerOutOfRetries(t *testing.T) {
 	t.Skip("Enable once bug #294 is fixed.")
 

+ 4 - 0
config.go

@@ -168,6 +168,10 @@ type Config struct {
 			// (default 100ms). Similar to the `retry.backoff.ms` setting of the
 			// JVM producer.
 			Backoff time.Duration
+			// Called to compute backoff time dynamically. Useful for implementing
+			// more sophisticated backoff strategies. This takes precedence over
+			// `Backoff` if set.
+			BackoffFunc func(retries, maxRetries int) time.Duration
 		}
 	}