Переглянути джерело

Merge pull request #26 from Shopify/offset_method_oldest

Add consumer OFFSET_METHOD_OLDEST
Evan Huus 12 роки тому
батько
коміт
907691e531
1 змінених файлів з 14 додано та 6 видалено
  1. 14 6
      consumer.go

+ 14 - 6
consumer.go

@@ -9,7 +9,10 @@ const (
 	OFFSET_METHOD_MANUAL OffsetMethod = iota
 	// OFFSET_METHOD_NEWEST causes the consumer to start at the most recent available offset, as
 	// determined by querying the broker.
-	OFFSET_METHOD_NEWEST OffsetMethod = iota
+	OFFSET_METHOD_NEWEST
+	// OFFSET_METHOD_OLDEST causes the consumer to start at the oldest available offset, as
+	// determined by querying the broker.
+	OFFSET_METHOD_OLDEST
 )
 
 // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
@@ -113,7 +116,12 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		}
 		c.offset = config.OffsetValue
 	case OFFSET_METHOD_NEWEST:
-		c.offset, err = c.getLatestOffset(true)
+		c.offset, err = c.getOffset(LATEST_OFFSETS, true)
+		if err != nil {
+			return nil, err
+		}
+	case OFFSET_METHOD_OLDEST:
+		c.offset, err = c.getOffset(EARLIEST_OFFSET, true)
 		if err != nil {
 			return nil, err
 		}
@@ -272,9 +280,9 @@ func (c *Consumer) fetchMessages() {
 	}
 }
 
-func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
+func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 	request := &OffsetRequest{}
-	request.AddBlock(c.topic, c.partition, LATEST_OFFSETS, 1)
+	request.AddBlock(c.topic, c.partition, where, 1)
 
 	response, err := c.broker.GetAvailableOffsets(c.client.id, request)
 	switch err {
@@ -291,7 +299,7 @@ func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
 		if err != nil {
 			return -1, err
 		}
-		return c.getLatestOffset(false)
+		return c.getOffset(where, false)
 	}
 
 	block := response.GetBlock(c.topic, c.partition)
@@ -317,7 +325,7 @@ func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
 		if err != nil {
 			return -1, err
 		}
-		return c.getLatestOffset(false)
+		return c.getOffset(where, false)
 	}
 
 	return -1, block.Err