|
|
@@ -100,8 +100,6 @@ func TestConcurrentSimpleProducer(t *testing.T) {
|
|
|
func TestProducer(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -116,7 +114,6 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 10
|
|
|
@@ -125,7 +122,6 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
|
|
|
@@ -146,13 +142,16 @@ func TestProducer(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -169,7 +168,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -178,7 +176,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
|
|
|
for flush := 0; flush < 3; flush++ {
|
|
|
for i := 0; i < 5; i++ {
|
|
|
@@ -198,15 +195,17 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
broker3 := NewMockBroker(t, 3)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
- defer broker3.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -227,7 +226,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -237,7 +235,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
@@ -255,6 +252,12 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker3.Close()
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerFailureRetry(t *testing.T) {
|