Browse Source

Merge pull request #373 from Shopify/circuitbreak-partitioning

producer: wrap partitioning in a circuit-breaker
Evan Huus 10 years ago
parent
commit
5ca5e10318
2 changed files with 9 additions and 1 deletions
  1. 5 0
      CHANGELOG.md
  2. 4 1
      async_producer.go

+ 5 - 0
CHANGELOG.md

@@ -2,6 +2,11 @@
 
 #### Unreleased
 
+Improvements:
+ - Wrap the producer's partitioner call in a circuit-breaker so that repeatedly
+   broken topics don't choke throughput
+   ([#373](https://github.com/Shopify/sarama/pull/373)).
+
 Bug Fixes:
  - Fix the producer's internal reference counting in certain unusual scenarios
    ([#367](https://github.com/Shopify/sarama/pull/367)).

+ 4 - 1
async_producer.go

@@ -265,10 +265,13 @@ func (p *asyncProducer) topicDispatcher() {
 func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
 	partitioner := p.conf.Producer.Partitioner(topic)
+	breaker := breaker.New(3, 1, 10*time.Second)
 
 	for msg := range input {
 		if msg.retries == 0 {
-			err := p.assignPartition(partitioner, msg)
+			err := breaker.Run(func() error {
+				return p.assignPartition(partitioner, msg)
+			})
 			if err != nil {
 				p.returnError(msg, err)
 				continue