|
|
@@ -1,6 +1,7 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
+ "errors"
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
@@ -31,22 +32,48 @@ func closeProducer(t *testing.T, p AsyncProducer) {
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
-func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
|
|
|
- for i := 0; i < successes; i++ {
|
|
|
+func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
|
|
|
+ for successes > 0 || errors > 0 {
|
|
|
select {
|
|
|
case msg := <-p.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
if msg.Msg.flags != 0 {
|
|
|
t.Error("Message had flags set")
|
|
|
}
|
|
|
+ errors--
|
|
|
+ if errors < 0 {
|
|
|
+ t.Error(msg.Err)
|
|
|
+ }
|
|
|
case msg := <-p.Successes():
|
|
|
if msg.flags != 0 {
|
|
|
t.Error("Message had flags set")
|
|
|
}
|
|
|
+ successes--
|
|
|
+ if successes < 0 {
|
|
|
+ t.Error("Too many successes")
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+type testPartitioner chan *int32
|
|
|
+
|
|
|
+func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
|
|
|
+ part := <-p
|
|
|
+ if part == nil {
|
|
|
+ return 0, errors.New("BOOM")
|
|
|
+ }
|
|
|
+
|
|
|
+ return *part, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p testPartitioner) RequiresConsistency() bool {
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func (p testPartitioner) feed(partition int32) {
|
|
|
+ p <- &partition
|
|
|
+}
|
|
|
+
|
|
|
func TestAsyncProducer(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
@@ -120,7 +147,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
|
|
|
for i := 0; i < 5; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- expectSuccesses(t, producer, 5)
|
|
|
+ expectResults(t, producer, 5, 0)
|
|
|
}
|
|
|
|
|
|
closeProducer(t, producer)
|
|
|
@@ -160,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
closeProducer(t, producer)
|
|
|
leader1.Close()
|
|
|
@@ -168,6 +195,48 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
+func TestAsyncProducerCustomPartitioner(t *testing.T) {
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+ leader := newMockBroker(t, 2)
|
|
|
+
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+
|
|
|
+ prodResponse := new(ProduceResponse)
|
|
|
+ prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
+ leader.Returns(prodResponse)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Producer.Flush.Messages = 2
|
|
|
+ config.Producer.Return.Successes = true
|
|
|
+ config.Producer.Partitioner = func(topic string) Partitioner {
|
|
|
+ p := make(testPartitioner)
|
|
|
+ go func() {
|
|
|
+ p.feed(0)
|
|
|
+ p <- nil
|
|
|
+ p <- nil
|
|
|
+ p <- nil
|
|
|
+ p.feed(0)
|
|
|
+ }()
|
|
|
+ return p
|
|
|
+ }
|
|
|
+ producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < 5; i++ {
|
|
|
+ producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ }
|
|
|
+ expectResults(t, producer, 2, 3)
|
|
|
+
|
|
|
+ closeProducer(t, producer)
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+}
|
|
|
+
|
|
|
func TestAsyncProducerFailureRetry(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader1 := newMockBroker(t, 2)
|
|
|
@@ -203,14 +272,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
leader1.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
leader2.Close()
|
|
|
closeProducer(t, producer)
|
|
|
@@ -245,7 +314,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
seedBroker.Close()
|
|
|
leader.Close()
|
|
|
|
|
|
@@ -288,7 +357,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
seedBroker.Close()
|
|
|
leader2.Close()
|
|
|
|
|
|
@@ -336,13 +405,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
seedBroker.Close()
|
|
|
leader1.Close()
|
|
|
@@ -400,7 +469,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
@@ -433,14 +502,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 1)
|
|
|
+ expectResults(t, producer, 1, 0)
|
|
|
|
|
|
// prime partition 1
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
prodSuccess = new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 1)
|
|
|
+ expectResults(t, producer, 1, 0)
|
|
|
|
|
|
// reboot the broker (the producer will get EOF on its existing connection)
|
|
|
leader.Close()
|
|
|
@@ -456,7 +525,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
|
|
|
prodSuccess = new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 1)
|
|
|
+ expectResults(t, producer, 1, 0)
|
|
|
|
|
|
// shutdown
|
|
|
closeProducer(t, producer)
|
|
|
@@ -493,7 +562,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 5)
|
|
|
+ expectResults(t, producer, 5, 0)
|
|
|
}
|
|
|
|
|
|
// send more messages on partition 0
|
|
|
@@ -511,14 +580,14 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 5)
|
|
|
+ expectResults(t, producer, 5, 0)
|
|
|
|
|
|
// put five more through
|
|
|
for i := 0; i < 5; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
|
|
|
}
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 5)
|
|
|
+ expectResults(t, producer, 5, 0)
|
|
|
|
|
|
// shutdown
|
|
|
closeProducer(t, producer)
|
|
|
@@ -564,7 +633,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- expectSuccesses(t, producer, 10)
|
|
|
+ expectResults(t, producer, 10, 0)
|
|
|
|
|
|
seedBroker.Close()
|
|
|
leader.Close()
|