浏览代码

Maintain metadata in the producer even when retries are disabled.

Some applications may wish to disable retries so that they can implement custom
retry logic.  As currently written, though, setting Config.Producer.Retry.Max to
zero also disables the mechanism by which the producer updates its metadata in
response to an error.  If the producer gets an error such as
ErrNotLeaderForPartition or an io.EOF, it will never recover...even in response
to background metadata updates.  This commit resolves that issue by ensuring
that the partition producers still update the leader and corresponding input
channel even when retries are turned off.
Steve van Loben Sels 7 年之前
父节点
当前提交
9f69c41903
共有 3 个文件被更改,包括 140 次插入6 次删除
  1. 44 6
      async_producer.go
  2. 51 0
      async_producer_test.go
  3. 45 0
      sync_producer_test.go

+ 44 - 6
async_producer.go

@@ -493,7 +493,34 @@ func (pp *partitionProducer) dispatch() {
 		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 		pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 	}
 	}
 
 
-	for msg := range pp.input {
+	defer func() {
+		if pp.brokerProducer != nil {
+			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+		}
+	}()
+
+	for {
+		var abandoned chan struct{}
+		if pp.brokerProducer != nil {
+			abandoned = pp.brokerProducer.abandoned
+		}
+
+		var msg *ProducerMessage
+		var ok bool
+		select {
+		case <-abandoned:
+			// a message on the abandoned channel means that our current broker selection is out of date
+			Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
+			pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+			pp.brokerProducer = nil
+			time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+			continue
+		case msg, ok = <-pp.input:
+			if !ok {
+				return
+			}
+		}
+
 		if msg.retries > pp.highWatermark {
 		if msg.retries > pp.highWatermark {
 			// a new, higher, retry level; handle it and then back off
 			// a new, higher, retry level; handle it and then back off
 			pp.newHighWatermark(msg.retries)
 			pp.newHighWatermark(msg.retries)
@@ -533,10 +560,6 @@ func (pp *partitionProducer) dispatch() {
 
 
 		pp.brokerProducer.input <- msg
 		pp.brokerProducer.input <- msg
 	}
 	}
-
-	if pp.brokerProducer != nil {
-		pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
-	}
 }
 }
 
 
 func (pp *partitionProducer) newHighWatermark(hwm int) {
 func (pp *partitionProducer) newHighWatermark(hwm int) {
@@ -637,6 +660,10 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
 		close(responses)
 		close(responses)
 	})
 	})
 
 
+	if p.conf.Producer.Retry.Max <= 0 {
+		bp.abandoned = make(chan struct{})
+	}
+
 	return bp
 	return bp
 }
 }
 
 
@@ -655,6 +682,7 @@ type brokerProducer struct {
 	input     chan *ProducerMessage
 	input     chan *ProducerMessage
 	output    chan<- *produceSet
 	output    chan<- *produceSet
 	responses <-chan *brokerProducerResponse
 	responses <-chan *brokerProducerResponse
+	abandoned chan struct{}
 
 
 	buffer     *produceSet
 	buffer     *produceSet
 	timer      <-chan time.Time
 	timer      <-chan time.Time
@@ -829,7 +857,12 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		// Retriable errors
 		// Retriable errors
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-			retryTopics = append(retryTopics, topic)
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.returnErrors(pSet.msgs, block.Err)
+				bp.parent.abandonBrokerConnection(bp.broker)
+			} else {
+				retryTopics = append(retryTopics, topic)
+			}
 		// Other non-retriable errors
 		// Other non-retriable errors
 		default:
 		default:
 			bp.parent.returnErrors(pSet.msgs, block.Err)
 			bp.parent.returnErrors(pSet.msgs, block.Err)
@@ -1042,5 +1075,10 @@ func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
 	p.brokerLock.Lock()
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 	defer p.brokerLock.Unlock()
 
 
+	bc, ok := p.brokers[broker]
+	if ok && bc.abandoned != nil {
+		close(bc.abandoned)
+	}
+
 	delete(p.brokers, broker)
 	delete(p.brokers, broker)
 }
 }

+ 51 - 0
async_producer_test.go

@@ -308,6 +308,57 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 	closeProducer(t, producer)
 }
 }
 
 
+func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
+
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataLeader1)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 2
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 0 // disable!
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Partitioner = NewManualPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	prodNotLeader.AddTopicPartition("my_topic", 1, ErrNotLeaderForPartition)
+	leader1.Returns(prodNotLeader)
+	expectResults(t, producer, 0, 2)
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
+	leader1.Returns(metadataLeader2)
+
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+	leader1.Returns(metadataLeader2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
+	leader2.Returns(prodSuccess)
+	expectResults(t, producer, 2, 0)
+
+	leader1.Close()
+	leader2.Close()
+	closeProducer(t, producer)
+}
+
 func TestAsyncProducerEncoderFailures(t *testing.T) {
 func TestAsyncProducerEncoderFailures(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 2)
 	leader := NewMockBroker(t, 2)

+ 45 - 0
sync_producer_test.go

@@ -177,6 +177,51 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 	broker.Close()
 	broker.Close()
 }
 }
 
 
+func TestSyncProducerRefreshMetadata(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
+
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataLeader1)
+
+	config := NewConfig()
+	config.Producer.Retry.Max = 0 // disable!
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Return.Successes = true
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	leader1.Returns(prodNotLeader)
+	_, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)})
+	if err != ErrNotLeaderForPartition {
+		t.Fatal(err)
+	}
+
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	leader1.Returns(metadataLeader2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader2.Returns(prodSuccess)
+	_, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	leader1.Close()
+	leader2.Close()
+	safeClose(t, producer)
+}
+
 // This example shows the basic usage pattern of the SyncProducer.
 // This example shows the basic usage pattern of the SyncProducer.
 func ExampleSyncProducer() {
 func ExampleSyncProducer() {
 	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
 	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)