Browse Source

Revert to individual msg retries for non-idempotent

Also do idempotent batch retries in a goroutine. This avoids the deadlock but does not
solve a race that can lead to out-of-sequence errors.

Idempotent producer is at proof-of-concept quality

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Edoardo Comar 6 years ago
parent
commit
587c5f694a
2 changed files with 10 additions and 7 deletions
  1. 10 4
      async_producer.go
  2. 0 3
      async_producer_test.go

+ 10 - 4
async_producer.go

@@ -837,9 +837,11 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 	})
 
 	if len(retryTopics) > 0 {
-		err := bp.parent.client.RefreshMetadata(retryTopics...)
-		if err != nil {
-			Logger.Printf("Failed refreshing metadata because of %v\n", err)
+		if bp.parent.conf.Producer.Idempotent {
+			err := bp.parent.client.RefreshMetadata(retryTopics...)
+			if err != nil {
+				Logger.Printf("Failed refreshing metadata because of %v\n", err)
+			}
 		}
 
 		sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
@@ -858,9 +860,13 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 					bp.currentRetries[topic] = make(map[int32]error)
 				}
 				bp.currentRetries[topic][partition] = block.Err
+				if bp.parent.conf.Producer.Idempotent {
+					go bp.parent.retryBatch(topic, partition, pSet, block.Err)
+				} else {
+					bp.parent.retryMessages(pSet.msgs, block.Err)
+				}
 				// dropping the following messages has the side effect of incrementing their retry count
 				bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
-				bp.parent.retryBatch(topic, partition, pSet, block.Err)
 			}
 		})
 	}

+ 0 - 3
async_producer_test.go

@@ -300,7 +300,6 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	leader2.Returns(metadataLeader2)
 	leader2.Returns(prodSuccess)
 	expectResults(t, producer, 10, 0)
 
@@ -468,7 +467,6 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 	seedBroker.Returns(metadataLeader1)
 	leader1.Returns(prodNotLeader)
 	seedBroker.Returns(metadataLeader2)
-	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
@@ -654,7 +652,6 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
 
 	// succeed this time
 	expectResults(t, producer, 5, 0)
-	seedBroker.Returns(metadataResponse)
 
 	// put five more through
 	for i := 0; i < 5; i++ {