Selaa lähdekoodia

producer: bugfix for aggregators getting stuck

In circumstances where Flush.Messages and/or Flush.Bytes were set but
Flush.Frequency was not, the producer's aggregator could get stuck on a retry
because a metadata-only chaser message would not be enough on its own to trigger
a flush, and so it would sit in limbo forever.

Always trigger a flush in the aggregator when the message is a chaser. This has
the additional benefit of reducing retry latency when Flush.Frequency *is* set.

Add a test for this case.
Evan Huus 10 vuotta sitten
vanhempi
commit
a381b3667d
3 muutettua tiedostoa jossa 65 lisäystä ja 0 poistoa
  1. 2 0
      CHANGELOG.md
  2. 1 0
      async_producer.go
  3. 62 0
      async_producer_test.go

+ 2 - 0
CHANGELOG.md

@@ -5,6 +5,8 @@
 Bug Fixes:
  - Fix the producer's internal reference counting in certain unusual scenarios
    ([#367](https://github.com/Shopify/sarama/pull/367)).
+ - Fix a condition where the producer's internal control messages could have
+   gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
 
 #### Version 1.0.0 (2015-03-17)
 

+ 1 - 0
async_producer.go

@@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 			bytesAccumulated += msg.byteSize()
 
 			if defaultFlush ||
+				msg.flags&chaser == chaser ||
 				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher

+ 62 - 0
async_producer_test.go

@@ -463,6 +463,68 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 	leader.Close()
 }
 
+func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Retry.Max = 1
+	config.Producer.Partitioner = NewManualPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// prime partitions
+	for p := int32(0); p < 2; p++ {
+		for i := 0; i < 5; i++ {
+			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
+		}
+		prodSuccess := new(ProduceResponse)
+		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
+		leader.Returns(prodSuccess)
+		expectSuccesses(t, producer, 5)
+	}
+
+	// send more messages on partition 0
+	for i := 0; i < 5; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+	}
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	leader.Returns(prodNotLeader)
+
+	// tell partition 0 to go to that broker again
+	seedBroker.Returns(metadataResponse)
+
+	// succeed this time
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	expectSuccesses(t, producer, 5)
+
+	// put five more through
+	for i := 0; i < 5; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+	}
+	leader.Returns(prodSuccess)
+	expectSuccesses(t, producer, 5)
+
+	// shutdown
+	closeProducer(t, producer)
+	seedBroker.Close()
+	leader.Close()
+}
+
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 func ExampleAsyncProducer_select() {