|
|
@@ -6,6 +6,7 @@ import (
|
|
|
"os/signal"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
const TestMessage = "ABC THE MESSAGE"
|
|
|
@@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
|
|
|
leader.Close()
|
|
|
}
|
|
|
|
|
|
+func TestAsyncProducerRetryShutdown(t *testing.T) {
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+ leader := newMockBroker(t, 2)
|
|
|
+
|
|
|
+ metadataLeader := new(MetadataResponse)
|
|
|
+ metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ seedBroker.Returns(metadataLeader)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Producer.Flush.Messages = 10
|
|
|
+ config.Producer.Return.Successes = true
|
|
|
+ config.Producer.Retry.Backoff = 0
|
|
|
+ producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ }
|
|
|
+ producer.AsyncClose()
|
|
|
+ time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
|
|
|
+
|
|
|
+ producer.Input() <- &ProducerMessage{Topic: "FOO"}
|
|
|
+ if err := <-producer.Errors(); err.Err != ErrShuttingDown {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ prodNotLeader := new(ProduceResponse)
|
|
|
+ prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
+ leader.Returns(prodNotLeader)
|
|
|
+
|
|
|
+ seedBroker.Returns(metadataLeader)
|
|
|
+
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
+ expectSuccesses(t, producer, 10)
|
|
|
+
|
|
|
+ seedBroker.Close()
|
|
|
+ leader.Close()
|
|
|
+
|
|
|
+ // wait for the async-closed producer to shut down fully
|
|
|
+ for err := range producer.Errors() {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// This example shows how to use the producer while simultaneously
|
|
|
// reading the Errors channel to know about any failures.
|
|
|
func ExampleAsyncProducer_select() {
|