Browse Source

consumer: shut down on OffsetOutOfRange

The old behaviour was to redispatch it (so it would go to another broker if
necessary) and then retry with the same offset, which was a rather useless thing
to do unless the offset had somehow ended up slightly ahead of the available
messages (which is unlikely - it is far more likely to fall behind).

Instead, simply shut down the PartitionConsumer so the user gets an error (if
they're subscribed) and their messages channel closes. They then get to choose
whether to give up and switch to a different offset, yell for a human, or
whatever.
Evan Huus 10 years ago
parent
commit
c8b3e2e53f
2 changed files with 45 additions and 0 deletions
  1. 5 0
      consumer.go
  2. 40 0
      consumer_test.go

+ 5 - 0
consumer.go

@@ -516,6 +516,11 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		for child := range w.subscriptions {
 			if err := child.handleResponse(response); err != nil {
 				switch err {
+				case ErrOffsetOutOfRange:
+					// there's no point in retrying this it will just fail the same way again
+					// so shut it down and force the user to choose what to do
+					child.AsyncClose()
+					fallthrough
 				default:
 					child.sendError(err)
 					fallthrough

+ 40 - 0
consumer_test.go

@@ -101,6 +101,46 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 }
 
+func TestConsumerShutsDownOutOfRange(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest)
+
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
+	leader.Returns(fetchResponse)
+
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 101)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, ok := <-consumer.Messages(); ok {
+		t.Error("Expected the consumer to shut down")
+	}
+
+	leader.Close()
+	safeClose(t, master)
+}
+
 func TestConsumerFunnyOffsets(t *testing.T) {
 	// for topics that are compressed and/or compacted (different things!) we have to be
 	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and