Browse Source

Add a test for bug #294

Skip it until the bug is fixed, but at least it will keep up with the API
changes now. Before it was inline in the ticket, and was falling behind.
Evan Huus 10 years ago
parent
commit
437cd2f8c4
1 changed files with 63 additions and 0 deletions
  1. 63 0
      producer_test.go

+ 63 - 0
producer_test.go

@@ -509,6 +509,69 @@ func TestProducerMultipleRetries(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestProducerOutOfRetries(t *testing.T) {
+	t.Skip("Enable once bug #294 is fixed.")
+
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Retry.Max = 0
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
+	leader.Returns(prodNotLeader)
+
+	for i := 0; i < 10; i++ {
+		select {
+		case msg := <-producer.Errors():
+			if msg.Err != ErrNotLeaderForPartition {
+				t.Error(msg.Err)
+			}
+		case <-producer.Successes():
+			t.Error("Unexpected success")
+		}
+	}
+
+	seedBroker.Returns(metadataResponse)
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+
+	for i := 0; i < 10; i++ {
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+		case <-producer.Successes():
+		}
+	}
+
+	leader.Close()
+	seedBroker.Close()
+	safeClose(t, producer)
+}
+
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 func ExampleProducer_select() {