Browse Source

Abandon broker connection for any error when retries are disabled

Steve van Loben Sels 6 years ago
parent
commit
d86d8e1f78
2 changed files with 57 additions and 43 deletions
  1. 4 1
      async_producer.go
  2. 53 42
      async_producer_test.go

+ 4 - 1
async_producer.go

@@ -851,13 +851,16 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			if bp.parent.conf.Producer.Retry.Max <= 0 {
-				bp.parent.returnErrors(pSet.msgs, block.Err)
 				bp.parent.abandonBrokerConnection(bp.broker)
+				bp.parent.returnErrors(pSet.msgs, block.Err)
 			} else {
 				retryTopics = append(retryTopics, topic)
 			}
 		// Other non-retriable errors
 		default:
+			if bp.parent.conf.Producer.Retry.Max <= 0 {
+				bp.parent.abandonBrokerConnection(bp.broker)
+			}
 			bp.parent.returnErrors(pSet.msgs, block.Err)
 		}
 	})

+ 53 - 42
async_producer_test.go

@@ -309,54 +309,65 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 }
 
 func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader1 := NewMockBroker(t, 2)
-	leader2 := NewMockBroker(t, 3)
 
-	metadataLeader1 := new(MetadataResponse)
-	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
-	metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataLeader1)
+	tt := func(t *testing.T, kErr KError) {
+		seedBroker := NewMockBroker(t, 1)
+		leader1 := NewMockBroker(t, 2)
+		leader2 := NewMockBroker(t, 3)
 
-	config := NewConfig()
-	config.Producer.Flush.Messages = 2
-	config.Producer.Return.Successes = true
-	config.Producer.Retry.Max = 0 // disable!
-	config.Producer.Retry.Backoff = 0
-	config.Producer.Partitioner = NewManualPartitioner
-	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
-	if err != nil {
-		t.Fatal(err)
-	}
-	seedBroker.Close()
+		metadataLeader1 := new(MetadataResponse)
+		metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+		metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
+		seedBroker.Returns(metadataLeader1)
 
-	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
-	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
-	prodNotLeader := new(ProduceResponse)
-	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
-	prodNotLeader.AddTopicPartition("my_topic", 1, ErrNotLeaderForPartition)
-	leader1.Returns(prodNotLeader)
-	expectResults(t, producer, 0, 2)
+		config := NewConfig()
+		config.Producer.Flush.Messages = 2
+		config.Producer.Return.Successes = true
+		config.Producer.Retry.Max = 0 // disable!
+		config.Producer.Retry.Backoff = 0
+		config.Producer.Partitioner = NewManualPartitioner
+		producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+		if err != nil {
+			t.Fatal(err)
+		}
+		seedBroker.Close()
 
-	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
-	metadataLeader2 := new(MetadataResponse)
-	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
-	metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
-	leader1.Returns(metadataLeader2)
-	leader1.Returns(metadataLeader2)
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+		prodNotLeader := new(ProduceResponse)
+		prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
+		prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
+		leader1.Returns(prodNotLeader)
+		expectResults(t, producer, 0, 2)
 
-	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
-	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
-	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
-	leader2.Returns(prodSuccess)
-	expectResults(t, producer, 2, 0)
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+		metadataLeader2 := new(MetadataResponse)
+		metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+		metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
+		leader1.Returns(metadataLeader2)
+		leader1.Returns(metadataLeader2)
+
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
+		prodSuccess := new(ProduceResponse)
+		prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+		prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
+		leader2.Returns(prodSuccess)
+		expectResults(t, producer, 2, 0)
 
-	leader1.Close()
-	leader2.Close()
-	closeProducer(t, producer)
+		leader1.Close()
+		leader2.Close()
+		closeProducer(t, producer)
+	}
+
+	t.Run("retriable error", func(t *testing.T) {
+		tt(t, ErrNotLeaderForPartition)
+	})
+
+	t.Run("non-retriable error", func(t *testing.T) {
+		tt(t, ErrNotController)
+	})
 }
 
 func TestAsyncProducerEncoderFailures(t *testing.T) {