Browse Source

Producer: block in `Close` without `Return.Errors`

If neither `Return.Errors` nor `Return.Successes` was set then `Close` wouldn't
actually block at all. This is arguably correct given the send-it-into-the-void-
and-pray configuration but it was still surprising.

It turns out to be a very minimal change to block anyway (since the `Errors`
channel is still present and closed at the appropriate moment) so just do that.
Also add a test for this configuration, since it's not one I remember to think
about on the regular.
Evan Huus 8 years ago
parent
commit
17b655d247
2 changed files with 42 additions and 0 deletions
  1. 2 0
      async_producer.go
  2. 40 0
      async_producer_test.go

+ 2 - 0
async_producer.go

@@ -210,6 +210,8 @@ func (p *asyncProducer) Close() error {
 		for event := range p.errors {
 			errors = append(errors, event)
 		}
+	} else {
+		<-p.errors
 	}
 
 	if len(errors) > 0 {

+ 40 - 0
async_producer_test.go

@@ -709,6 +709,46 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
 	}
 }
 
+func TestAsyncProducerNoReturns(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
+
+	metadataLeader := new(MetadataResponse)
+	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataLeader)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.Return.Successes = false
+	config.Producer.Return.Errors = false
+	config.Producer.Retry.Backoff = 0
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+
+	wait := make(chan bool)
+	go func() {
+		if err := producer.Close(); err != nil {
+			t.Error(err)
+		}
+		close(wait)
+	}()
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+
+	<-wait
+	seedBroker.Close()
+	leader.Close()
+}
+
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 func ExampleAsyncProducer_select() {