|
|
@@ -8,6 +8,26 @@ import (
|
|
|
|
|
|
const TestMessage = "ABC THE MESSAGE"
|
|
|
|
|
|
+func closeProducer(t *testing.T, p *Producer) {
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ p.AsyncClose()
|
|
|
+
|
|
|
+ wg.Add(2)
|
|
|
+ go func() {
|
|
|
+ for _ = range p.Successes() {
|
|
|
+ t.Error("Unexpected message on Successes()")
|
|
|
+ }
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ go func() {
|
|
|
+ for msg := range p.Errors() {
|
|
|
+ t.Error(msg.Err)
|
|
|
+ }
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
func TestDefaultProducerConfigValidates(t *testing.T) {
|
|
|
config := NewProducerConfig()
|
|
|
if err := config.Validate(); err != nil {
|
|
|
@@ -144,7 +164,7 @@ func TestProducer(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
@@ -197,7 +217,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
@@ -254,7 +274,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
leader1.Close()
|
|
|
leader0.Close()
|
|
|
@@ -335,7 +355,7 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
leader2.Close()
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
@@ -389,7 +409,7 @@ func TestProducerBrokerBounce(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
leader.Close()
|
|
|
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
@@ -450,7 +470,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
leader2.Close()
|
|
|
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
@@ -535,7 +555,7 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
leader1.Close()
|
|
|
leader2.Close()
|
|
|
- safeClose(t, producer)
|
|
|
+ closeProducer(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|