浏览代码

Add consumer OFFSET_METHOD_OLDEST

Like _NEWEST but for fetching from the oldest available offset.
Evan Huus 12 年之前
父节点
当前提交
aeff5a9a6c
共有 1 个文件被更改,包括 13 次插入5 次删除
  1. 13 5
      consumer.go

+ 13 - 5
consumer.go

@@ -10,6 +10,9 @@ const (
 	// OFFSET_METHOD_NEWEST causes the consumer to start at the most recent available offset, as
 	// determined by querying the broker.
 	OFFSET_METHOD_NEWEST OffsetMethod = iota
+	// OFFSET_METHOD_OLDEST causes the consumer to start at the oldest available offset, as
+	// determined by querying the broker.
+	OFFSET_METHOD_OLDEST OffsetMethod = iota
 )
 
 // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
@@ -113,7 +116,12 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		}
 		c.offset = config.OffsetValue
 	case OFFSET_METHOD_NEWEST:
-		c.offset, err = c.getLatestOffset(true)
+		c.offset, err = c.getOffset(LATEST_OFFSETS, true)
+		if err != nil {
+			return nil, err
+		}
+	case OFFSET_METHOD_OLDEST:
+		c.offset, err = c.getOffset(EARLIEST_OFFSET, true)
 		if err != nil {
 			return nil, err
 		}
@@ -272,9 +280,9 @@ func (c *Consumer) fetchMessages() {
 	}
 }
 
-func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
+func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 	request := &OffsetRequest{}
-	request.AddBlock(c.topic, c.partition, LATEST_OFFSETS, 1)
+	request.AddBlock(c.topic, c.partition, where, 1)
 
 	response, err := c.broker.GetAvailableOffsets(c.client.id, request)
 	switch err {
@@ -291,7 +299,7 @@ func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
 		if err != nil {
 			return -1, err
 		}
-		return c.getLatestOffset(false)
+		return c.getOffset(where, false)
 	}
 
 	block := response.GetBlock(c.topic, c.partition)
@@ -317,7 +325,7 @@ func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
 		if err != nil {
 			return -1, err
 		}
-		return c.getLatestOffset(false)
+		return c.getOffset(where, false)
 	}
 
 	return -1, block.Err