Bladeren bron

Merge pull request #424 from Shopify/stop-consuming-offset-out-of-range

consumer: shut down on OffsetOutOfRange
Evan Huus 10 jaren geleden
bovenliggende
commit
ac599949be
2 gewijzigde bestanden met toevoegingen van 51 en 5 verwijderingen
  1. 11 5
      consumer.go
  2. 40 0
      consumer_test.go

+ 11 - 5
consumer.go

@@ -211,11 +211,12 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
 // when it passes out of scope.
 //
 // The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
-// loop. The PartitionConsumer will under no circumstances stop by itself once it is started, it will
-// just keep retrying if it encounters errors. By default, it logs these errors to sarama.Logger;
-// if you want to handle errors yourself, set your config's Consumer.Return.Errors to true, and read
-// from the Errors channel as well, using a select statement or in a separate goroutine. Check out
-// the examples of Consumer to see examples of these different approaches.
+// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
+// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
+// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
+// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
+// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
+// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
 type PartitionConsumer interface {
 
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
@@ -516,6 +517,11 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		for child := range w.subscriptions {
 			if err := child.handleResponse(response); err != nil {
 				switch err {
+				case ErrOffsetOutOfRange:
+					// there's no point in retrying this it will just fail the same way again
+					// so shut it down and force the user to choose what to do
+					child.AsyncClose()
+					fallthrough
 				default:
 					child.sendError(err)
 					fallthrough

+ 40 - 0
consumer_test.go

@@ -101,6 +101,46 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 }
 
+func TestConsumerShutsDownOutOfRange(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest)
+
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
+	leader.Returns(fetchResponse)
+
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 101)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, ok := <-consumer.Messages(); ok {
+		t.Error("Expected the consumer to shut down")
+	}
+
+	leader.Close()
+	safeClose(t, master)
+}
+
 func TestConsumerFunnyOffsets(t *testing.T) {
 	// for topics that are compressed and/or compacted (different things!) we have to be
 	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and