Browse Source

single metadata refresh for all retries

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Edoardo Comar 6 years ago
parent
commit
eb38bbc24f
2 changed files with 39 additions and 34 deletions
  1. 33 19
      async_producer.go
  2. 6 15
      async_producer_test.go

+ 33 - 19
async_producer.go

@@ -797,6 +797,7 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
 func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
 	// we iterate through the blocks in the request set, not the response, so that we notice
 	// if the response is missing a block completely
+	var retryTopics []string
 	sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
@@ -828,21 +829,41 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 		// Retriable errors
 		case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
-			Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
-				bp.broker.ID(), topic, partition, block.Err)
-			if bp.currentRetries[topic] == nil {
-				bp.currentRetries[topic] = make(map[int32]error)
-			}
-			bp.currentRetries[topic][partition] = block.Err
-			// dropping the following messages has the side effect of incrementing their retry count
-			bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
-			bp.parent.retryBatch(topic, partition, pSet, block.Err)
-
+			retryTopics = append(retryTopics, topic)
 		// Other non-retriable errors
 		default:
 			bp.parent.returnErrors(pSet.msgs, block.Err)
 		}
 	})
+
+	if len(retryTopics) > 0 {
+		err := bp.parent.client.RefreshMetadata(retryTopics...)
+		if err != nil {
+			Logger.Printf("Failed refreshing metadata because of %v\n", err)
+		}
+
+		sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
+			block := response.GetBlock(topic, partition)
+			if block == nil {
+				// handled in the previous "eachPartition" loop
+				return
+			}
+
+			switch block.Err {
+			case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
+				ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
+				Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
+					bp.broker.ID(), topic, partition, block.Err)
+				if bp.currentRetries[topic] == nil {
+					bp.currentRetries[topic] = make(map[int32]error)
+				}
+				bp.currentRetries[topic][partition] = block.Err
+				// dropping the following messages has the side effect of incrementing their retry count
+				bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
+				bp.parent.retryBatch(topic, partition, pSet, block.Err)
+			}
+		})
+	}
 }
 
 func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
@@ -859,15 +880,8 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
 		}
 		msg.retries++
 	}
-	// extremely pessimistic strategy - refreshing metadata for every batch retried. Should be improved
-	err := p.client.RefreshMetadata(topic)
-	if err != nil {
-		Logger.Printf("Failed retrying batch for %v-%d because of %v while refreshing metadata\n", topic, partition, err)
-		for _, msg := range pSet.msgs {
-			p.returnError(msg, kerr)
-		}
-		return
-	}
+
+	// it's expected that a metadata refresh has been requested prior to calling retryBatch
 	leader, err := p.client.Leader(topic, partition)
 	if err != nil {
 		Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)

+ 6 - 15
async_producer_test.go

@@ -760,12 +760,9 @@ func TestAsyncProducerNoReturns(t *testing.T) {
 func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
 	broker := NewMockBroker(t, 1)
 
-	clusterID := "cid"
 	metadataResponse := &MetadataResponse{
-		Version:        3,
-		ThrottleTimeMs: 0,
-		ClusterID:      &clusterID,
-		ControllerID:   1,
+		Version:      1,
+		ControllerID: 1,
 	}
 	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
 	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
@@ -821,12 +818,9 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 	for _, test := range tests {
 		broker := NewMockBroker(t, 1)
 
-		clusterID := "cid"
 		metadataResponse := &MetadataResponse{
-			Version:        3,
-			ThrottleTimeMs: 0,
-			ClusterID:      &clusterID,
-			ControllerID:   1,
+			Version:      1,
+			ControllerID: 1,
 		}
 		metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
 		metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
@@ -966,12 +960,9 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
 	broker := NewMockBroker(t, 1)
 
-	clusterID := "cid"
 	metadataResponse := &MetadataResponse{
-		Version:        3,
-		ThrottleTimeMs: 0,
-		ClusterID:      &clusterID,
-		ControllerID:   1,
+		Version:      1,
+		ControllerID: 1,
 	}
 	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
 	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)