Browse Source

Merge pull request #467 from Shopify/shrink-partitioner-breaker

Shrink the scope of the partition circuit-breaker
Evan Huus 10 years ago
parent
commit
377669e691
2 changed files with 98 additions and 30 deletions
  1. 10 11
      async_producer.go
  2. 88 19
      async_producer_test.go

+ 10 - 11
async_producer.go

@@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer
 
 
 	for msg := range input {
 	for msg := range input {
 		if msg.retries == 0 {
 		if msg.retries == 0 {
-			err := breaker.Run(func() error {
-				return p.assignPartition(partitioner, msg)
-			})
-			if err != nil {
+			if err := p.assignPartition(breaker, partitioner, msg); err != nil {
 				p.returnError(msg, err)
 				p.returnError(msg, err)
 				continue
 				continue
 			}
 			}
@@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() {
 	close(p.successes)
 	close(p.successes)
 }
 }
 
 
-func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
+func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error {
 	var partitions []int32
 	var partitions []int32
-	var err error
 
 
-	if partitioner.RequiresConsistency() {
-		partitions, err = p.client.Partitions(msg.Topic)
-	} else {
-		partitions, err = p.client.WritablePartitions(msg.Topic)
-	}
+	err := breaker.Run(func() (err error) {
+		if partitioner.RequiresConsistency() {
+			partitions, err = p.client.Partitions(msg.Topic)
+		} else {
+			partitions, err = p.client.WritablePartitions(msg.Topic)
+		}
+		return
+	})
 
 
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 88 - 19
async_producer_test.go

@@ -1,6 +1,7 @@
 package sarama
 package sarama
 
 
 import (
 import (
+	"errors"
 	"log"
 	"log"
 	"os"
 	"os"
 	"os/signal"
 	"os/signal"
@@ -31,22 +32,48 @@ func closeProducer(t *testing.T, p AsyncProducer) {
 	wg.Wait()
 	wg.Wait()
 }
 }
 
 
-func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
-	for i := 0; i < successes; i++ {
+func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
+	for successes > 0 || errors > 0 {
 		select {
 		select {
 		case msg := <-p.Errors():
 		case msg := <-p.Errors():
-			t.Error(msg.Err)
 			if msg.Msg.flags != 0 {
 			if msg.Msg.flags != 0 {
 				t.Error("Message had flags set")
 				t.Error("Message had flags set")
 			}
 			}
+			errors--
+			if errors < 0 {
+				t.Error(msg.Err)
+			}
 		case msg := <-p.Successes():
 		case msg := <-p.Successes():
 			if msg.flags != 0 {
 			if msg.flags != 0 {
 				t.Error("Message had flags set")
 				t.Error("Message had flags set")
 			}
 			}
+			successes--
+			if successes < 0 {
+				t.Error("Too many successes")
+			}
 		}
 		}
 	}
 	}
 }
 }
 
 
+type testPartitioner chan *int32
+
+func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
+	part := <-p
+	if part == nil {
+		return 0, errors.New("BOOM")
+	}
+
+	return *part, nil
+}
+
+func (p testPartitioner) RequiresConsistency() bool {
+	return true
+}
+
+func (p testPartitioner) feed(partition int32) {
+	p <- &partition
+}
+
 func TestAsyncProducer(t *testing.T) {
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leader := newMockBroker(t, 2)
@@ -120,7 +147,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
 		for i := 0; i < 5; i++ {
 		for i := 0; i < 5; i++ {
 			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		}
 		}
-		expectSuccesses(t, producer, 5)
+		expectResults(t, producer, 5, 0)
 	}
 	}
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
@@ -160,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	}
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
 	leader1.Close()
 	leader1.Close()
@@ -168,6 +195,48 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
 	seedBroker.Close()
 	seedBroker.Close()
 }
 }
 
 
+func TestAsyncProducerCustomPartitioner(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodResponse := new(ProduceResponse)
+	prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodResponse)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 2
+	config.Producer.Return.Successes = true
+	config.Producer.Partitioner = func(topic string) Partitioner {
+		p := make(testPartitioner)
+		go func() {
+			p.feed(0)
+			p <- nil
+			p <- nil
+			p <- nil
+			p.feed(0)
+		}()
+		return p
+	}
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 5; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	expectResults(t, producer, 2, 3)
+
+	closeProducer(t, producer)
+	leader.Close()
+	seedBroker.Close()
+}
+
 func TestAsyncProducerFailureRetry(t *testing.T) {
 func TestAsyncProducerFailureRetry(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	seedBroker := newMockBroker(t, 1)
 	leader1 := newMockBroker(t, 2)
 	leader1 := newMockBroker(t, 2)
@@ -203,14 +272,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	leader2.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 	leader1.Close()
 	leader1.Close()
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	}
 	leader2.Returns(prodSuccess)
 	leader2.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	leader2.Close()
 	leader2.Close()
 	closeProducer(t, producer)
 	closeProducer(t, producer)
@@ -245,7 +314,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 	seedBroker.Close()
 	seedBroker.Close()
 	leader.Close()
 	leader.Close()
 
 
@@ -288,7 +357,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	leader2.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 	seedBroker.Close()
 	seedBroker.Close()
 	leader2.Close()
 	leader2.Close()
 
 
@@ -336,13 +405,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
 	leader2.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	}
 	leader2.Returns(prodSuccess)
 	leader2.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	seedBroker.Close()
 	seedBroker.Close()
 	leader1.Close()
 	leader1.Close()
@@ -400,7 +469,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
 
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
@@ -433,14 +502,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 1)
+	expectResults(t, producer, 1, 0)
 
 
 	// prime partition 1
 	// prime partition 1
 	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	prodSuccess = new(ProduceResponse)
 	prodSuccess = new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 1)
+	expectResults(t, producer, 1, 0)
 
 
 	// reboot the broker (the producer will get EOF on its existing connection)
 	// reboot the broker (the producer will get EOF on its existing connection)
 	leader.Close()
 	leader.Close()
@@ -456,7 +525,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 	prodSuccess = new(ProduceResponse)
 	prodSuccess = new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 1)
+	expectResults(t, producer, 1, 0)
 
 
 	// shutdown
 	// shutdown
 	closeProducer(t, producer)
 	closeProducer(t, producer)
@@ -493,7 +562,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
 		prodSuccess := new(ProduceResponse)
 		prodSuccess := new(ProduceResponse)
 		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
 		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
 		leader.Returns(prodSuccess)
 		leader.Returns(prodSuccess)
-		expectSuccesses(t, producer, 5)
+		expectResults(t, producer, 5, 0)
 	}
 	}
 
 
 	// send more messages on partition 0
 	// send more messages on partition 0
@@ -511,14 +580,14 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 5)
+	expectResults(t, producer, 5, 0)
 
 
 	// put five more through
 	// put five more through
 	for i := 0; i < 5; i++ {
 	for i := 0; i < 5; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 	}
 	}
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 5)
+	expectResults(t, producer, 5, 0)
 
 
 	// shutdown
 	// shutdown
 	closeProducer(t, producer)
 	closeProducer(t, producer)
@@ -564,7 +633,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
-	expectSuccesses(t, producer, 10)
+	expectResults(t, producer, 10, 0)
 
 
 	seedBroker.Close()
 	seedBroker.Close()
 	leader.Close()
 	leader.Close()