|
|
@@ -30,6 +30,22 @@ func closeProducer(t *testing.T, p AsyncProducer) {
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
+func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
|
|
|
+ for i := 0; i < successes; i++ {
|
|
|
+ select {
|
|
|
+ case msg := <-p.Errors():
|
|
|
+ t.Error(msg.Err)
|
|
|
+ if msg.Msg.flags != 0 {
|
|
|
+ t.Error("Message had flags set")
|
|
|
+ }
|
|
|
+ case msg := <-p.Successes():
|
|
|
+ if msg.flags != 0 {
|
|
|
+ t.Error("Message had flags set")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestAsyncProducer(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
@@ -103,19 +119,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
|
|
|
for i := 0; i < 5; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- for i := 0; i < 5; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 5)
|
|
|
}
|
|
|
|
|
|
closeProducer(t, producer)
|
|
|
@@ -155,19 +159,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
|
|
|
closeProducer(t, producer)
|
|
|
leader1.Close()
|
|
|
@@ -210,38 +202,14 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
leader1.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
|
|
|
leader2.Close()
|
|
|
closeProducer(t, producer)
|
|
|
@@ -276,19 +244,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
seedBroker.Close()
|
|
|
leader.Close()
|
|
|
|
|
|
@@ -331,19 +287,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
seedBroker.Close()
|
|
|
leader2.Close()
|
|
|
|
|
|
@@ -391,37 +335,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
leader2.Returns(prodSuccess)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
|
|
|
seedBroker.Close()
|
|
|
leader1.Close()
|
|
|
@@ -479,13 +399,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- }
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
@@ -518,22 +432,14 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
|
|
|
prodSuccess := new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 1)
|
|
|
|
|
|
// prime partition 1
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
prodSuccess = new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 1)
|
|
|
|
|
|
// reboot the broker (the producer will get EOF on its existing connection)
|
|
|
leader.Close()
|
|
|
@@ -549,11 +455,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
|
|
|
prodSuccess = new(ProduceResponse)
|
|
|
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
leader.Returns(prodSuccess)
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- }
|
|
|
+ expectSuccesses(t, producer, 1)
|
|
|
|
|
|
// shutdown
|
|
|
closeProducer(t, producer)
|