Ver código fonte

Don't retry messages until the broker is closed

Otherwise there is a case where the retried messages can have their
newly-selected broker closed out from under them if the remainder of this
goroutine gets heavily delayed by the scheduler.

I believe this may be the cause of the flaky failure in
TestAsyncProducerBrokerBounce (see e.g.
https://travis-ci.org/Shopify/sarama/jobs/66053366)
Evan Huus 10 anos atrás
pai
commit
5ad52d8407
1 arquivos alterados com 1 adições e 1 exclusões
  1. 1 1
      async_producer.go

+ 1 - 1
async_producer.go

@@ -536,9 +536,9 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
 		default:
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
 			p.abandonBrokerConnection(broker)
-			p.retryMessages(batch, err)
 			_ = broker.Close()
 			closing = err
+			p.retryMessages(batch, err)
 			continue
 		}