Browse Source

Combine the OffsetMethod and Offset parameters

Evan Huus 10 years ago
parent
commit
bce6b9ae7e
3 changed files with 29 additions and 36 deletions
  1. 22 29
      consumer.go
  2. 6 6
      consumer_test.go
  3. 1 1
      functional_test.go

+ 22 - 29
consumer.go

@@ -6,21 +6,6 @@ import (
 	"time"
 )
 
-// OffsetMethod is passed to ConsumePartition to tell the consumer how to determine the starting offset.
-type OffsetMethod int
-
-const (
-	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
-	// determined by querying the broker.
-	OffsetMethodNewest OffsetMethod = iota
-	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
-	// determined by querying the broker.
-	OffsetMethodOldest
-	// OffsetMethodManual causes the consumer to interpret the offset value as the
-	// offset at which to start, allowing the user to manually specify their desired starting offset.
-	OffsetMethodManual
-)
-
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 type ConsumerMessage struct {
 	Key, Value []byte
@@ -85,9 +70,19 @@ func NewConsumer(client *Client, config *Config) (*Consumer, error) {
 	return c, nil
 }
 
-// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will
-// return an error if this Consumer is already consuming on the given topic/partition.
-func (c *Consumer) ConsumePartition(topic string, partition int32, method OffsetMethod, offset int64) (*PartitionConsumer, error) {
+const (
+	// OffsetNewest causes the consumer to start at the most recent available offset, as
+	// determined by querying the broker.
+	OffsetNewest int64 = -1
+	// OffsetOldest causes the consumer to start at the oldest available offset, as
+	// determined by querying the broker.
+	OffsetOldest int64 = -2
+)
+
+// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
+// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
+// literal offset, or OffsetNewest or OffsetOldest
+func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error) {
 	child := &PartitionConsumer{
 		consumer:  c,
 		conf:      c.conf,
@@ -100,7 +95,7 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, method Offset
 		fetchSize: c.conf.Consumer.Fetch.Default,
 	}
 
-	if err := child.chooseStartingOffset(method, offset); err != nil {
+	if err := child.chooseStartingOffset(offset); err != nil {
 		return nil, err
 	}
 
@@ -265,22 +260,20 @@ func (child *PartitionConsumer) dispatch() error {
 	return nil
 }
 
-func (child *PartitionConsumer) chooseStartingOffset(method OffsetMethod, offset int64) (err error) {
+func (child *PartitionConsumer) chooseStartingOffset(offset int64) (err error) {
 	var where OffsetTime
 
-	switch method {
-	case OffsetMethodManual:
+	switch offset {
+	case OffsetNewest:
+		where = LatestOffsets
+	case OffsetOldest:
+		where = EarliestOffset
+	default:
 		if offset < 0 {
-			return ConfigurationError("offset must be >= 0 when the method is manual")
+			return ConfigurationError("Invalid offset")
 		}
 		child.offset = offset
 		return nil
-	case OffsetMethodNewest:
-		where = LatestOffsets
-	case OffsetMethodOldest:
-		where = EarliestOffset
-	default:
-		return ConfigurationError("Invalid OffsetMethod")
 	}
 
 	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)

+ 6 - 6
consumer_test.go

@@ -33,7 +33,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 1234)
+	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -83,7 +83,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodNewest, 0)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -129,7 +129,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 2)
+	consumer, err := master.ConsumePartition("my_topic", 0, 2)
 
 	message := <-consumer.Messages()
 	if message.Offset != 3 {
@@ -169,7 +169,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
 	for i := 0; i < 2; i++ {
-		consumer, err := master.ConsumePartition("my_topic", int32(i), OffsetMethodManual, 0)
+		consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
 		if err != nil {
 			t.Error(err)
 		}
@@ -288,7 +288,7 @@ func ExampleConsumerWithSelect() {
 		fmt.Println("> master consumer ready")
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 0)
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		panic(err)
 	} else {
@@ -337,7 +337,7 @@ func ExampleConsumerWithGoroutines() {
 		fmt.Println("> master consumer ready")
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 0)
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		panic(err)
 	} else {

+ 1 - 1
functional_test.go

@@ -136,7 +136,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	consumer, err := master.ConsumePartition("single_partition", 0, OffsetMethodNewest, 0)
+	consumer, err := master.ConsumePartition("single_partition", 0, OffsetNewest)
 	if err != nil {
 		t.Fatal(err)
 	}