|
@@ -16,21 +16,21 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestSimpleProducer(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- broker2.Returns(response2)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
}
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -49,24 +49,24 @@ func TestSimpleProducer(t *testing.T) {
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
- broker2.Close()
|
|
|
- broker1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
func TestConcurrentSimpleProducer(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -94,24 +94,24 @@ func TestConcurrentSimpleProducer(t *testing.T) {
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
- broker2.Close()
|
|
|
- broker1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducer(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -146,26 +146,26 @@ func TestProducer(t *testing.T) {
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
- broker2.Close()
|
|
|
- broker1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
- broker2.Returns(response2)
|
|
|
- broker2.Returns(response2)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -199,31 +199,31 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
- broker2.Close()
|
|
|
- broker1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
- broker3 := NewMockBroker(t, 3)
|
|
|
-
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddBroker(broker3.Addr(), broker3.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
-
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
-
|
|
|
- response3 := new(ProduceResponse)
|
|
|
- response3.AddTopicPartition("my_topic", 1, NoError)
|
|
|
- broker3.Returns(response3)
|
|
|
-
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader0 := NewMockBroker(t, 2)
|
|
|
+ leader1 := NewMockBroker(t, 3)
|
|
|
+
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
|
|
|
+ metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+
|
|
|
+ prodResponse0 := new(ProduceResponse)
|
|
|
+ prodResponse0.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader0.Returns(prodResponse0)
|
|
|
+
|
|
|
+ prodResponse1 := new(ProduceResponse)
|
|
|
+ prodResponse1.AddTopicPartition("my_topic", 1, NoError)
|
|
|
+ leader1.Returns(prodResponse1)
|
|
|
+
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -256,22 +256,22 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
- broker3.Close()
|
|
|
- broker2.Close()
|
|
|
- broker1.Close()
|
|
|
+ leader1.Close()
|
|
|
+ leader0.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerFailureRetry(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
- broker3 := NewMockBroker(t, 3)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader1 := NewMockBroker(t, 2)
|
|
|
+ leader2 := NewMockBroker(t, 3)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataLeader1 := new(MetadataResponse)
|
|
|
+ metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
|
|
|
+ metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -284,23 +284,23 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- broker1.Close()
|
|
|
+ seedBroker.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
|
|
|
- broker2.Returns(response2)
|
|
|
+ prodNotLeader := new(ProduceResponse)
|
|
|
+ prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
|
|
|
+ leader1.Returns(prodNotLeader)
|
|
|
|
|
|
- response3 := new(MetadataResponse)
|
|
|
- response3.AddBroker(broker3.Addr(), broker3.BrokerID())
|
|
|
- response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
|
|
|
- broker2.Returns(response3)
|
|
|
+ metadataLeader2 := new(MetadataResponse)
|
|
|
+ metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
|
|
|
+ metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
|
|
|
+ leader1.Returns(metadataLeader2)
|
|
|
|
|
|
- response4 := new(ProduceResponse)
|
|
|
- response4.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker3.Returns(response4)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader2.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -314,12 +314,12 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- broker2.Close()
|
|
|
+ leader1.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- broker3.Returns(response4)
|
|
|
+ leader2.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -334,23 +334,22 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- broker3.Close()
|
|
|
+ leader2.Close()
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestProducerBrokerBounce(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
+ leaderAddr := leader.Addr()
|
|
|
|
|
|
- addr2 := broker2.Addr()
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(addr2, broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
-
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -367,13 +366,13 @@ func TestProducerBrokerBounce(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- broker2.Close()
|
|
|
- broker2 = NewMockBrokerAddr(t, 2, addr2)
|
|
|
- broker1.Returns(response1)
|
|
|
+ leader.Close()
|
|
|
+ leader = NewMockBrokerAddr(t, 2, leaderAddr)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -387,24 +386,24 @@ func TestProducerBrokerBounce(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- broker1.Close()
|
|
|
- broker2.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ leader.Close()
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
- broker3 := NewMockBroker(t, 3)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader1 := NewMockBroker(t, 2)
|
|
|
+ leader2 := NewMockBroker(t, 3)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataLeader1 := new(MetadataResponse)
|
|
|
+ metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
|
|
|
+ metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -422,19 +421,19 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- broker2.Close()
|
|
|
- broker1.Returns(response1)
|
|
|
- broker1.Returns(response1)
|
|
|
+ leader1.Close()
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
|
|
|
-
|
|
|
- response2 := new(MetadataResponse)
|
|
|
- response2.AddBroker(broker3.Addr(), broker3.BrokerID())
|
|
|
- response2.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response2)
|
|
|
+
|
|
|
+ metadataLeader2 := new(MetadataResponse)
|
|
|
+ metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
|
|
|
+ metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataLeader2)
|
|
|
|
|
|
- response3 := new(ProduceResponse)
|
|
|
- response3.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker3.Returns(response3)
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader2.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -448,24 +447,24 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- broker1.Close()
|
|
|
- broker3.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ leader2.Close()
|
|
|
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleRetries(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
- broker3 := NewMockBroker(t, 3)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader1 := NewMockBroker(t, 2)
|
|
|
+ leader2 := NewMockBroker(t, 3)
|
|
|
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
+ metadataLeader1 := new(MetadataResponse)
|
|
|
+ metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
|
|
|
+ metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
@@ -483,24 +482,24 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
|
|
|
- broker2.Returns(response2)
|
|
|
-
|
|
|
- response3 := new(MetadataResponse)
|
|
|
- response3.AddBroker(broker3.Addr(), broker3.BrokerID())
|
|
|
- response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response3)
|
|
|
- broker3.Returns(response2)
|
|
|
- broker1.Returns(response1)
|
|
|
- broker2.Returns(response2)
|
|
|
- broker1.Returns(response1)
|
|
|
- broker2.Returns(response2)
|
|
|
- broker1.Returns(response3)
|
|
|
-
|
|
|
- response4 := new(ProduceResponse)
|
|
|
- response4.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker3.Returns(response4)
|
|
|
+ prodNotLeader := new(ProduceResponse)
|
|
|
+ prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
|
|
|
+ leader1.Returns(prodNotLeader)
|
|
|
+
|
|
|
+ metadataLeader2 := new(MetadataResponse)
|
|
|
+ metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
|
|
|
+ metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataLeader2)
|
|
|
+ leader2.Returns(prodNotLeader)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
+ leader1.Returns(prodNotLeader)
|
|
|
+ seedBroker.Returns(metadataLeader1)
|
|
|
+ leader1.Returns(prodNotLeader)
|
|
|
+ seedBroker.Returns(metadataLeader2)
|
|
|
+
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ leader2.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -518,7 +517,7 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
}
|
|
|
- broker3.Returns(response4)
|
|
|
+ leader2.Returns(prodSuccess)
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
case msg := <-producer.Errors():
|
|
@@ -533,9 +532,9 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- broker1.Close()
|
|
|
- broker2.Close()
|
|
|
- broker3.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ leader1.Close()
|
|
|
+ leader2.Close()
|
|
|
safeClose(t, producer)
|
|
|
safeClose(t, client)
|
|
|
}
|