Преглед изворни кода

Merge pull request #1189 from segmentio/maintain-producer-metadata-when-retries-disabled

Maintain metadata in the producer even when retries are disabled.
Vlad Gorodetsky пре 6 година
родитељ
комит
45ea73e0fb
3 измењених фајлова са 146 додато и 5 уклоњено
  1. 39 5
      async_producer.go
  2. 62 0
      async_producer_test.go
  3. 45 0
      sync_producer_test.go

+ 39 - 5
async_producer.go

@@ -493,7 +493,27 @@ func (pp *partitionProducer) dispatch() {
 		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 	}
 
+	defer func() {
+		if pp.brokerProducer != nil {
+			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+		}
+	}()
+
 	for msg := range pp.input {
+
+		if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
+			select {
+			case <-pp.brokerProducer.abandoned:
+				// a message on the abandoned channel means that our current broker selection is out of date
+				Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
+				pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+				pp.brokerProducer = nil
+				time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			default:
+				// producer connection is still open.
+			}
+		}
+
 		if msg.retries > pp.highWatermark {
 			// a new, higher, retry level; handle it and then back off
 			pp.newHighWatermark(msg.retries)
@@ -533,10 +553,6 @@ func (pp *partitionProducer) dispatch() {
 
 		pp.brokerProducer.input <- msg
 	}
-
-	if pp.brokerProducer != nil {
-		pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
-	}
 }
 
 func (pp *partitionProducer) newHighWatermark(hwm int) {
@@ -637,6 +653,10 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
 		close(responses)
 	})
 
+	if p.conf.Producer.Retry.Max <= 0 {
+		bp.abandoned = make(chan struct{})
+	}
+
 	return bp
 }
 
@@ -655,6 +675,7 @@ type brokerProducer struct {
 	input     chan *ProducerMessage
 	output    chan<- *produceSet
 	responses <-chan *brokerProducerResponse
+	abandoned chan struct{}
 
 	buffer     *produceSet
 	timer      <-chan time.Time
@@ -829,9 +850,17 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		// Retriable errors
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-			retryTopics = append(retryTopics, topic)
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.abandonBrokerConnection(bp.broker)
+				bp.parent.returnErrors(pSet.msgs, block.Err)
+			} else {
+				retryTopics = append(retryTopics, topic)
+			}
 		// Other non-retriable errors
 		default:
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.abandonBrokerConnection(bp.broker)
+			}
 			bp.parent.returnErrors(pSet.msgs, block.Err)
 		}
 	})
@@ -1048,5 +1077,10 @@ func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 
+	bc, ok := p.brokers[broker]
+	if ok && bc.abandoned != nil {
+		close(bc.abandoned)
+	}
+
 	delete(p.brokers, broker)
 }

+ 62 - 0
async_producer_test.go

@@ -307,6 +307,68 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
+
+	tt := func(t *testing.T, kErr KError) {
+		seedBroker := NewMockBroker(t, 1)
+		leader1 := NewMockBroker(t, 2)
+		leader2 := NewMockBroker(t, 3)
+
+		metadataLeader1 := new(MetadataResponse)
+		metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+		metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
+		seedBroker.Returns(metadataLeader1)
+
+		config := NewConfig()
+		config.Producer.Flush.Messages = 2
+		config.Producer.Return.Successes = true
+		config.Producer.Retry.Max = 0 // disable!
+		config.Producer.Retry.Backoff = 0
+		config.Producer.Partitioner = NewManualPartitioner
+		producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+		if err != nil {
+			t.Fatal(err)
+		}
+		seedBroker.Close()
+
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+		prodNotLeader := new(ProduceResponse)
+		prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
+		prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
+		leader1.Returns(prodNotLeader)
+		expectResults(t, producer, 0, 2)
+
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+		metadataLeader2 := new(MetadataResponse)
+		metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+		metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
+		leader1.Returns(metadataLeader2)
+		leader1.Returns(metadataLeader2)
+
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+		prodSuccess := new(ProduceResponse)
+		prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+		prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
+		leader2.Returns(prodSuccess)
+		expectResults(t, producer, 2, 0)
+
+		leader1.Close()
+		leader2.Close()
+		closeProducer(t, producer)
+	}
+
+	t.Run("retriable error", func(t *testing.T) {
+		tt(t, ErrNotLeaderForPartition)
+	})
+
+	t.Run("non-retriable error", func(t *testing.T) {
+		tt(t, ErrNotController)
+	})
+}
+
 func TestAsyncProducerEncoderFailures(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 2)

+ 45 - 0
sync_producer_test.go

@@ -177,6 +177,51 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 	broker.Close()
 }
 
+func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
+
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataLeader1)
+
+	config := NewConfig()
+	config.Producer.Retry.Max = 0 // disable!
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Return.Successes = true
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	leader1.Returns(prodNotLeader)
+	_, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)})
+	if err != ErrNotLeaderForPartition {
+		t.Fatal(err)
+	}
+
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	leader1.Returns(metadataLeader2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader2.Returns(prodSuccess)
+	_, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	leader1.Close()
+	leader2.Close()
+	safeClose(t, producer)
+}
+
 // This example shows the basic usage pattern of the SyncProducer.
 func ExampleSyncProducer() {
 	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)