ソースを参照

Merge pull request #368 from Shopify/flush-retry-recovery

producer: bugfix for aggregators getting stuck
Evan Huus 10 年 前
コミット
963015ec94
3 ファイル変更92 行追加125 行削除
  1. 2 0
      CHANGELOG.md
  2. 1 0
      async_producer.go
  3. 89 125
      async_producer_test.go

+ 2 - 0
CHANGELOG.md

@@ -5,6 +5,8 @@
 Bug Fixes:
  - Fix the producer's internal reference counting in certain unusual scenarios
    ([#367](https://github.com/Shopify/sarama/pull/367)).
+ - Fix a condition where the producer's internal control messages could have
+   gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
 
 #### Version 1.0.0 (2015-03-17)
 

+ 1 - 0
async_producer.go

@@ -458,6 +458,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 			bytesAccumulated += msg.byteSize()
 
 			if defaultFlush ||
+				msg.flags&chaser == chaser ||
 				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher

+ 89 - 125
async_producer_test.go

@@ -30,6 +30,22 @@ func closeProducer(t *testing.T, p AsyncProducer) {
 	wg.Wait()
 }
 
+func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
+	for i := 0; i < successes; i++ {
+		select {
+		case msg := <-p.Errors():
+			t.Error(msg.Err)
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-p.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		}
+	}
+}
+
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
@@ -103,19 +119,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
 		for i := 0; i < 5; i++ {
 			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 		}
-		for i := 0; i < 5; i++ {
-			select {
-			case msg := <-producer.Errors():
-				t.Error(msg.Err)
-				if msg.Msg.flags != 0 {
-					t.Error("Message had flags set")
-				}
-			case msg := <-producer.Successes():
-				if msg.flags != 0 {
-					t.Error("Message had flags set")
-				}
-			}
-		}
+		expectSuccesses(t, producer, 5)
 	}
 
 	closeProducer(t, producer)
@@ -155,19 +159,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 
 	closeProducer(t, producer)
 	leader1.Close()
@@ -210,38 +202,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 	leader1.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	leader2.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 
 	leader2.Close()
 	closeProducer(t, producer)
@@ -276,19 +244,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 	seedBroker.Close()
 	leader.Close()
 
@@ -331,19 +287,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 	seedBroker.Close()
 	leader2.Close()
 
@@ -391,37 +335,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader2.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	leader2.Returns(prodSuccess)
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-			if msg.Msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		case msg := <-producer.Successes():
-			if msg.flags != 0 {
-				t.Error("Message had flags set")
-			}
-		}
-	}
+	expectSuccesses(t, producer, 10)
 
 	seedBroker.Close()
 	leader1.Close()
@@ -479,13 +399,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
-	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-producer.Errors():
-			t.Error(msg.Err)
-		case <-producer.Successes():
-		}
-	}
+	expectSuccesses(t, producer, 10)
 
 	leader.Close()
 	seedBroker.Close()
@@ -518,22 +432,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
-	select {
-	case msg := <-producer.Errors():
-		t.Error(msg.Err)
-	case <-producer.Successes():
-	}
+	expectSuccesses(t, producer, 1)
 
 	// prime partition 1
 	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	prodSuccess = new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader.Returns(prodSuccess)
-	select {
-	case msg := <-producer.Errors():
-		t.Error(msg.Err)
-	case <-producer.Successes():
-	}
+	expectSuccesses(t, producer, 1)
 
 	// reboot the broker (the producer will get EOF on its existing connection)
 	leader.Close()
@@ -549,11 +455,69 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 	prodSuccess = new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
-	select {
-	case msg := <-producer.Errors():
-		t.Error(msg.Err)
-	case <-producer.Successes():
+	expectSuccesses(t, producer, 1)
+
+	// shutdown
+	closeProducer(t, producer)
+	seedBroker.Close()
+	leader.Close()
+}
+
+func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Retry.Max = 1
+	config.Producer.Partitioner = NewManualPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// prime partitions
+	for p := int32(0); p < 2; p++ {
+		for i := 0; i < 5; i++ {
+			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
+		}
+		prodSuccess := new(ProduceResponse)
+		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
+		leader.Returns(prodSuccess)
+		expectSuccesses(t, producer, 5)
+	}
+
+	// send more messages on partition 0
+	for i := 0; i < 5; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
+	}
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	leader.Returns(prodNotLeader)
+
+	// tell partition 0 to go to that broker again
+	seedBroker.Returns(metadataResponse)
+
+	// succeed this time
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	expectSuccesses(t, producer, 5)
+
+	// put five more through
+	for i := 0; i < 5; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 	}
+	leader.Returns(prodSuccess)
+	expectSuccesses(t, producer, 5)
 
 	// shutdown
 	closeProducer(t, producer)