Prechádzať zdrojové kódy

Merge pull request #22 from Shopify/consumer_latest_flag

Add an OffsetMethod
Evan Huus 12 rokov pred
rodič
commit
96599c4142
3 zmenil súbory, kde vykonal 195 pridanie a 26 odobranie
  1. 83 7
      consumer.go
  2. 100 19
      consumer_test.go
  3. 12 0
      offset_response.go

+ 83 - 7
consumer.go

@@ -1,5 +1,17 @@
 package sarama
 
+// OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
+type OffsetMethod int
+
+const (
+	// OFFSET_METHOD_MANUAL causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
+	// offset at which to start, allowing the user to manually specify their desired starting offset.
+	OFFSET_METHOD_MANUAL OffsetMethod = iota
+	// OFFSET_METHOD_NEWEST causes the consumer to start at the most recent available offset, as
+	// determined by querying the broker.
+	OFFSET_METHOD_NEWEST OffsetMethod = iota
+)
+
 // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
 type ConsumerConfig struct {
 	// The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
@@ -13,8 +25,10 @@ type ConsumerConfig struct {
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default of 0 is treated as no limit.
 	MaxWaitTime int32
-	// The offset to start fetching messages from
-	StartingOffset int64
+	// The method used to determine at which offset to begin consuming messages.
+	OffsetMethod OffsetMethod
+	// Interpreted differently according to the value of OffsetMethod.
+	OffsetValue int64
 }
 
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
@@ -69,10 +83,6 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid MaxWaitTime")
 	}
 
-	if config.StartingOffset < 0 {
-		return nil, ConfigurationError("Invalid StartingOffset")
-	}
-
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 		return nil, err
@@ -84,8 +94,23 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 	c.partition = partition
 	c.group = group
 	c.config = *config
-	c.offset = config.StartingOffset
 	c.broker = broker
+
+	switch config.OffsetMethod {
+	case OFFSET_METHOD_MANUAL:
+		if config.OffsetValue < 0 {
+			return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
+		}
+		c.offset = config.OffsetValue
+	case OFFSET_METHOD_NEWEST:
+		c.offset, err = c.getLatestOffset(true)
+		if err != nil {
+			return nil, err
+		}
+	default:
+		return nil, ConfigurationError("Invalid OffsetMethod")
+	}
+
 	c.stopper = make(chan bool)
 	c.done = make(chan bool)
 	c.events = make(chan *ConsumerEvent)
@@ -236,3 +261,54 @@ func (c *Consumer) fetchMessages() {
 		}
 	}
 }
+
+func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
+	request := &OffsetRequest{}
+	request.AddBlock(c.topic, c.partition, LATEST_OFFSETS, 1)
+
+	response, err := c.broker.GetAvailableOffsets(c.client.id, request)
+	switch err {
+	case nil:
+		break
+	case EncodingError:
+		return -1, err
+	default:
+		if !retry {
+			return -1, err
+		}
+		c.client.disconnectBroker(c.broker)
+		c.broker, err = c.client.leader(c.topic, c.partition)
+		if err != nil {
+			return -1, err
+		}
+		return c.getLatestOffset(false)
+	}
+
+	block := response.GetBlock(c.topic, c.partition)
+	if block == nil {
+		return -1, IncompleteResponse
+	}
+
+	switch block.Err {
+	case NO_ERROR:
+		if len(block.Offsets) < 1 {
+			return -1, IncompleteResponse
+		}
+		return block.Offsets[0], nil
+	case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
+		if !retry {
+			return -1, block.Err
+		}
+		err = c.client.refreshTopic(c.topic)
+		if err != nil {
+			return -1, err
+		}
+		c.broker, err = c.client.leader(c.topic, c.partition)
+		if err != nil {
+			return -1, err
+		}
+		return c.getLatestOffset(false)
+	}
+
+	return -1, block.Err
+}

+ 100 - 19
consumer_test.go

@@ -7,16 +7,17 @@ import (
 	"time"
 )
 
-func TestSimpleConsumer(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
+var (
+	consumerStopper = []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+	}
+	extraBrokerMetadata = []byte{
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
@@ -29,7 +30,21 @@ func TestSimpleConsumer(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
+		0x00, 0x00, 0x00, 0x00,
+	}
+)
+
+func TestSimpleConsumer(t *testing.T) {
+	masterResponses := make(chan []byte, 1)
+	extraResponses := make(chan []byte)
+	mockBroker := NewMockBroker(t, masterResponses)
+	mockExtra := NewMockBroker(t, extraResponses)
+	defer mockBroker.Close()
+	defer mockExtra.Close()
+
+	// return the extra mock as another available broker
+	response := make([]byte, len(extraBrokerMetadata))
+	copy(response, extraBrokerMetadata)
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	masterResponses <- response
 	go func() {
@@ -54,14 +69,7 @@ func TestSimpleConsumer(t *testing.T) {
 			binary.BigEndian.PutUint64(msg[36:], uint64(i))
 			extraResponses <- msg
 		}
-		extraResponses <- []byte{
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00,
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00, 0x00, 0x00}
+		extraResponses <- consumerStopper
 	}()
 
 	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
@@ -87,6 +95,79 @@ func TestSimpleConsumer(t *testing.T) {
 	}
 }
 
+func TestConsumerRawOffset(t *testing.T) {
+	masterResponses := make(chan []byte, 1)
+	extraResponses := make(chan []byte, 1)
+	mockBroker := NewMockBroker(t, masterResponses)
+	mockExtra := NewMockBroker(t, extraResponses)
+	defer mockBroker.Close()
+	defer mockExtra.Close()
+
+	// return the extra mock as another available broker
+	response := make([]byte, len(extraBrokerMetadata))
+	copy(response, extraBrokerMetadata)
+	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
+	masterResponses <- response
+	extraResponses <- consumerStopper
+
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OFFSET_METHOD_MANUAL, OffsetValue: 1234})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	if consumer.offset != 1234 {
+		t.Error("Raw offset not set correctly")
+	}
+}
+
+func TestConsumerLatestOffset(t *testing.T) {
+	masterResponses := make(chan []byte, 1)
+	extraResponses := make(chan []byte, 2)
+	mockBroker := NewMockBroker(t, masterResponses)
+	mockExtra := NewMockBroker(t, extraResponses)
+	defer mockBroker.Close()
+	defer mockExtra.Close()
+
+	// return the extra mock as another available broker
+	response := make([]byte, len(extraBrokerMetadata))
+	copy(response, extraBrokerMetadata)
+	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
+	masterResponses <- response
+	extraResponses <- []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01,
+	}
+	extraResponses <- consumerStopper
+
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OFFSET_METHOD_NEWEST})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	if consumer.offset != 0x010101 {
+		t.Error("Latest offset not fetched correctly")
+	}
+}
+
 func ExampleConsumer() {
 	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
 	if err != nil {

+ 12 - 0
offset_response.go

@@ -58,3 +58,15 @@ func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
+	if r.Blocks == nil {
+		return nil
+	}
+
+	if r.Blocks[topic] == nil {
+		return nil
+	}
+
+	return r.Blocks[topic][partition]
+}