Kaynağa Gözat

Merge pull request #418 from Shopify/better_offset_range_handling

Consumer: check offset before returning ConsumePartition.
Willem van Bergen 10 yıl önce
ebeveyn
işleme
1221a24ea4
4 değiştirilmiş dosya ile 179 ekleme ve 19 silme
  1. 18 12
      consumer.go
  2. 135 7
      consumer_test.go
  3. 25 0
      functional_consumer_test.go
  4. 1 0
      mockbroker_test.go

+ 18 - 12
consumer.go

@@ -313,22 +313,28 @@ func (child *partitionConsumer) dispatch() error {
 	return nil
 }
 
-func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
-	var time int64
+func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
+	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
+	if err != nil {
+		return err
+	}
+	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
+	if err != nil {
+		return err
+	}
 
-	switch offset {
-	case OffsetNewest, OffsetOldest:
-		time = offset
-	default:
-		if offset < 0 {
-			return ConfigurationError("Invalid offset")
-		}
+	switch {
+	case offset == OffsetNewest:
+		child.offset = newestOffset
+	case offset == OffsetOldest:
+		child.offset = oldestOffset
+	case offset >= oldestOffset && offset <= newestOffset:
 		child.offset = offset
-		return nil
+	default:
+		return ErrOffsetOutOfRange
 	}
 
-	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
-	return err
+	return nil
 }
 
 func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {

+ 135 - 7
consumer_test.go

@@ -18,6 +18,14 @@ func TestConsumerOffsetManual(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest)
+
 	for i := 0; i <= 10; i++ {
 		fetchResponse := new(FetchResponse)
 		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
@@ -60,9 +68,13 @@ func TestConsumerLatestOffset(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
-	offsetResponse := new(OffsetResponse)
-	offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
-	leader.Returns(offsetResponse)
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
+	leader.Returns(offsetResponseOldest)
 
 	fetchResponse := new(FetchResponse)
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
@@ -101,6 +113,14 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest)
+
 	fetchResponse := new(FetchResponse)
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
@@ -152,10 +172,26 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	offsetResponseNewest0 := new(OffsetResponse)
+	offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
+	leader0.Returns(offsetResponseNewest0)
+
+	offsetResponseOldest0 := new(OffsetResponse)
+	offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
+	leader0.Returns(offsetResponseOldest0)
+
+	offsetResponseNewest1 := new(OffsetResponse)
+	offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
+	leader1.Returns(offsetResponseNewest1)
+
+	offsetResponseOldest1 := new(OffsetResponse)
+	offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
+	leader1.Returns(offsetResponseOldest1)
+
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
-	for i := 0; i < 2; i++ {
-		consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
+	for i := int32(0); i < 2; i++ {
+		consumer, err := master.ConsumePartition("my_topic", i, 0)
 		if err != nil {
 			t.Error(err)
 		}
@@ -179,7 +215,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 			}
 			safeClose(t, consumer)
 			wg.Done()
-		}(int32(i), consumer)
+		}(i, consumer)
 	}
 
 	// leader0 provides first four messages on partition 0
@@ -273,6 +309,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	offsetResponseNewest0 := new(OffsetResponse)
+	offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
+	leader.Returns(offsetResponseNewest0)
+
+	offsetResponseOldest0 := new(OffsetResponse)
+	offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest0)
+
 	c0, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		t.Fatal(err)
@@ -282,6 +326,14 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
 	leader.Returns(fetchResponse)
 
+	offsetResponseNewest1 := new(OffsetResponse)
+	offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
+	leader.Returns(offsetResponseNewest1)
+
+	offsetResponseOldest1 := new(OffsetResponse)
+	offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
+	leader.Returns(offsetResponseOldest1)
+
 	c1, err := master.ConsumePartition("my_topic", 1, 0)
 	if err != nil {
 		t.Fatal(err)
@@ -301,11 +353,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leaderAddr := leader.Addr()
+	tmp := newMockBroker(t, 3)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -317,17 +371,44 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
+	leader.Returns(offsetResponseNewest)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
+	leader.Returns(offsetResponseOldest)
+
 	c0, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		t.Fatal(err)
 	}
 
+	offsetResponseNewest = new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
+	tmp.Returns(offsetResponseNewest)
+
+	offsetResponseOldest = new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
+	tmp.Returns(offsetResponseOldest)
+
 	c1, err := master.ConsumePartition("my_topic", 1, 0)
 	if err != nil {
 		t.Fatal(err)
 	}
 
+	//redirect partition 1 back to main leader
 	fetchResponse := new(FetchResponse)
+	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
+	tmp.Returns(fetchResponse)
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+	time.Sleep(5 * time.Millisecond)
+
+	// now send one message to each partition to make sure everything is primed
+	fetchResponse = new(FetchResponse)
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
 	fetchResponse.AddError("my_topic", 1, ErrNoError)
 	leader.Returns(fetchResponse)
@@ -339,6 +420,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	leader.Returns(fetchResponse)
 	<-c1.Messages()
 
+	// bounce the broker
 	leader.Close()
 	leader = newMockBrokerAddr(t, 2, leaderAddr)
 
@@ -365,6 +447,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	// send it back to the same broker
 	seedBroker.Returns(metadataResponse)
 
+	time.Sleep(5 * time.Millisecond)
+
 	select {
 	case <-c0.Messages():
 	case <-c1.Messages():
@@ -384,6 +468,50 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	}()
 	wg.Wait()
 	safeClose(t, master)
+	tmp.Close()
+}
+
+func TestConsumerOffsetOutOfRange(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	seedBroker.Close()
+
+	offsetResponseNewest := new(OffsetResponse)
+	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
+
+	offsetResponseOldest := new(OffsetResponse)
+	offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
+
+	leader.Returns(offsetResponseNewest)
+	leader.Returns(offsetResponseOldest)
+	if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
+		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
+	}
+
+	leader.Returns(offsetResponseNewest)
+	leader.Returns(offsetResponseOldest)
+	if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
+		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
+	}
+
+	leader.Returns(offsetResponseNewest)
+	leader.Returns(offsetResponseOldest)
+	if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
+		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
+	}
+
+	leader.Close()
+	safeClose(t, master)
 }
 
 // This example has the simplest use case of the consumer. It simply

+ 25 - 0
functional_consumer_test.go

@@ -0,0 +1,25 @@
+package sarama
+
+import (
+	"math"
+	"testing"
+)
+
+func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
+	checkKafkaAvailability(t)
+
+	consumer, err := NewConsumer(kafkaBrokers, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := consumer.ConsumePartition("single_partition", 0, -10); err != ErrOffsetOutOfRange {
+		t.Error("Expected ErrOffsetOutOfRange, got:", err)
+	}
+
+	if _, err := consumer.ConsumePartition("single_partition", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
+		t.Error("Expected ErrOffsetOutOfRange, got:", err)
+	}
+
+	safeClose(t, consumer)
+}

+ 1 - 0
mockbroker_test.go

@@ -146,6 +146,7 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
 	if err != nil {
 		t.Fatal(err)
 	}
+	Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
 	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
 	if err != nil {
 		t.Fatal(err)