Prechádzať zdrojové kódy

Shrink the scope of the partition circuit-breaker

It shouldn't wrap the entirety of `assignPartition`, since that also protects
things like the call to actually choose the partition. The only thing it is
supposed to protect is the call to get the list of partitions, since that is
what can block.
Evan Huus 11 rokov pred
rodič
commit
3e798fd0a3
1 zmenil súbory, kde vykonal 10 pridanie a 11 odobranie
  1. 10 11
      async_producer.go

+ 10 - 11
async_producer.go

@@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer
 
 
 	for msg := range input {
 	for msg := range input {
 		if msg.retries == 0 {
 		if msg.retries == 0 {
-			err := breaker.Run(func() error {
-				return p.assignPartition(partitioner, msg)
-			})
-			if err != nil {
+			if err := p.assignPartition(breaker, partitioner, msg); err != nil {
 				p.returnError(msg, err)
 				p.returnError(msg, err)
 				continue
 				continue
 			}
 			}
@@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() {
 	close(p.successes)
 	close(p.successes)
 }
 }
 
 
-func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
+func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error {
 	var partitions []int32
 	var partitions []int32
-	var err error
 
 
-	if partitioner.RequiresConsistency() {
-		partitions, err = p.client.Partitions(msg.Topic)
-	} else {
-		partitions, err = p.client.WritablePartitions(msg.Topic)
-	}
+	err := breaker.Run(func() (err error) {
+		if partitioner.RequiresConsistency() {
+			partitions, err = p.client.Partitions(msg.Topic)
+		} else {
+			partitions, err = p.client.WritablePartitions(msg.Topic)
+		}
+		return
+	})
 
 
 	if err != nil {
 	if err != nil {
 		return err
 		return err