|
|
@@ -35,12 +35,13 @@ func TestSimpleProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ defer safeClose(t, client)
|
|
|
|
|
|
producer, err := NewSimpleProducer(client, "my_topic", nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
+ defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
err = producer.SendMessage(nil, StringEncoder(TestMessage))
|
|
|
@@ -69,6 +70,7 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 10
|
|
|
@@ -77,7 +79,7 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
+ defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
@@ -111,6 +113,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -161,6 +164,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -170,7 +174,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
+ defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
@@ -199,6 +203,7 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 10
|
|
|
@@ -207,7 +212,7 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
+ defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|