|
|
@@ -509,6 +509,69 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
closeProducer(t, producer)
|
|
|
}
|
|
|
|
|
|
+func TestProducerOutOfRetries(t *testing.T) {
|
|
|
+ t.Skip("Enable once bug #294 is fixed.")
|
|
|
+
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+ leader := newMockBroker(t, 2)
|
|
|
+
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Producer.Flush.Messages = 10
|
|
|
+ config.Producer.AckSuccesses = true
|
|
|
+ config.Producer.Retry.Backoff = 0
|
|
|
+ config.Producer.Retry.Max = 0
|
|
|
+ producer, err := NewProducer([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ }
|
|
|
+
|
|
|
+ prodNotLeader := new(ProduceResponse)
|
|
|
+ prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
+ leader.Returns(prodNotLeader)
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ select {
|
|
|
+ case msg := <-producer.Errors():
|
|
|
+ if msg.Err != ErrNotLeaderForPartition {
|
|
|
+ t.Error(msg.Err)
|
|
|
+ }
|
|
|
+ case <-producer.Successes():
|
|
|
+ t.Error("Unexpected success")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ }
|
|
|
+
|
|
|
+ prodSuccess := new(ProduceResponse)
|
|
|
+ prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
+ leader.Returns(prodSuccess)
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ select {
|
|
|
+ case msg := <-producer.Errors():
|
|
|
+ t.Error(msg.Err)
|
|
|
+ case <-producer.Successes():
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ safeClose(t, producer)
|
|
|
+}
|
|
|
+
|
|
|
// This example shows how to use the producer while simultaneously
|
|
|
// reading the Errors channel to know about any failures.
|
|
|
func ExampleProducer_select() {
|