Explorar el Código

Add test for encoding failures

Evan Huus hace 10 años
padre
commit
7e88a995fe
Se han modificado 1 ficheros con 56 adiciones y 1 borrados
  1. 56 1
      async_producer_test.go

+ 56 - 1
async_producer_test.go

@@ -33,13 +33,15 @@ func closeProducer(t *testing.T, p AsyncProducer) {
 }
 
 func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
-	for successes > 0 || errors > 0 {
+	expect := successes + errors
+	for expect > 0 {
 		select {
 		case msg := <-p.Errors():
 			if msg.Msg.flags != 0 {
 				t.Error("Message had flags set")
 			}
 			errors--
+			expect--
 			if errors < 0 {
 				t.Error(msg.Err)
 			}
@@ -48,11 +50,15 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
 				t.Error("Message had flags set")
 			}
 			successes--
+			expect--
 			if successes < 0 {
 				t.Error("Too many successes")
 			}
 		}
 	}
+	if successes != 0 || errors != 0 {
+		t.Error("Unexpected successes", successes, "or errors", errors)
+	}
 }
 
 type testPartitioner chan *int32
@@ -74,6 +80,19 @@ func (p testPartitioner) feed(partition int32) {
 	p <- &partition
 }
 
+type flakyEncoder bool
+
+func (f flakyEncoder) Length() int {
+	return len(TestMessage)
+}
+
+func (f flakyEncoder) Encode() ([]byte, error) {
+	if !bool(f) {
+		return nil, errors.New("flaky encoding error")
+	}
+	return []byte(TestMessage), nil
+}
+
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
@@ -285,6 +304,42 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerEncoderFailures(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 3
+	config.Producer.Return.Successes = true
+	config.Producer.Partitioner = NewManualPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for flush := 0; flush < 3; flush++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
+		expectResults(t, producer, 1, 2)
+	}
+
+	closeProducer(t, producer)
+	leader.Close()
+	seedBroker.Close()
+}
+
 // If a Kafka broker becomes unavailable and then returns back in service, then
 // producer reconnects to it and continues sending messages.
 func TestAsyncProducerBrokerBounce(t *testing.T) {