|
@@ -580,20 +580,23 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
|
|
|
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
leader.Returns(prodNotLeader)
|
|
|
|
|
|
+ time.Sleep(50 * time.Millisecond)
|
|
|
+
|
|
|
+ leader.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "ProduceRequest": newMockProduceResponse(t).
|
|
|
+ SetError("my_topic", 0, ErrNoError),
|
|
|
+ })
|
|
|
+
|
|
|
|
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
|
|
|
- prodSuccess := new(ProduceResponse)
|
|
|
- prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
- leader.Returns(prodSuccess)
|
|
|
expectResults(t, producer, 5, 0)
|
|
|
|
|
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
|
|
|
}
|
|
|
- leader.Returns(prodSuccess)
|
|
|
expectResults(t, producer, 5, 0)
|
|
|
|
|
|
|