Explorar el Código

Add an OffsetMethod that permits specifying other methods of determining the offset, such as querying the broker

Evan Huus hace 12 años
padre
commit
2565bc81b7
Se han modificado 2 ficheros con 93 adiciones y 7 borrados
  1. 81 7
      consumer.go
  2. 12 0
      offset_response.go

+ 81 - 7
consumer.go

@@ -1,5 +1,15 @@
 package sarama
 
+// OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
+type OffsetMethod int
+
+const (
+	// When RAW_OFFSET is passed as the OffsetMethod, OffsetValue is interpreted as the raw offset to start at.
+	RAW_OFFSET OffsetMethod = iota
+	// When LATEST_OFFSET is passed as the OffsetMethod, the broker is queried for the most recent valid offset.
+	LATEST_OFFSET OffsetMethod = iota
+)
+
 // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
 type ConsumerConfig struct {
 	// The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
@@ -13,8 +23,10 @@ type ConsumerConfig struct {
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default of 0 is treated as no limit.
 	MaxWaitTime int32
-	// The offset to start fetching messages from
-	StartingOffset int64
+	// The method of determining which offset to start at
+	OffsetMethod OffsetMethod
+	// Interpreted according to OffsetMethod
+	OffsetValue int64
 }
 
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
@@ -69,10 +81,6 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid MaxWaitTime")
 	}
 
-	if config.StartingOffset < 0 {
-		return nil, ConfigurationError("Invalid StartingOffset")
-	}
-
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 		return nil, err
@@ -84,8 +92,23 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 	c.partition = partition
 	c.group = group
 	c.config = *config
-	c.offset = config.StartingOffset
 	c.broker = broker
+
+	switch config.OffsetMethod {
+	case RAW_OFFSET:
+		if config.OffsetValue < 0 {
+			return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is RAW_OFFSET")
+		}
+		c.offset = config.OffsetValue
+	case LATEST_OFFSET:
+		c.offset, err = c.getLatestOffset(true)
+		if err != nil {
+			return nil, err
+		}
+	default:
+		return nil, ConfigurationError("Invalid OffsetMethod")
+	}
+
 	c.stopper = make(chan bool)
 	c.done = make(chan bool)
 	c.events = make(chan *ConsumerEvent)
@@ -235,3 +258,54 @@ func (c *Consumer) fetchMessages() {
 		}
 	}
 }
+
+func (c *Consumer) getLatestOffset(retry bool) (int64, error) {
+	request := &OffsetRequest{}
+	request.AddBlock(c.topic, c.partition, LATEST_OFFSETS, 1)
+
+	response, err := c.broker.GetAvailableOffsets(c.client.id, request)
+	switch err {
+	case nil:
+		break
+	case EncodingError:
+		return -1, err
+	default:
+		if !retry {
+			return -1, err
+		}
+		c.client.disconnectBroker(c.broker)
+		c.broker, err = c.client.leader(c.topic, c.partition)
+		if err != nil {
+			return -1, err
+		}
+		return c.getLatestOffset(false)
+	}
+
+	block := response.GetBlock(c.topic, c.partition)
+	if block == nil {
+		return -1, IncompleteResponse
+	}
+
+	switch block.Err {
+	case NO_ERROR:
+		if len(block.Offsets) < 1 {
+			return -1, IncompleteResponse
+		}
+		return block.Offsets[0], nil
+	case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
+		if !retry {
+			return -1, block.Err
+		}
+		err = c.client.refreshTopic(c.topic)
+		if err != nil {
+			return -1, err
+		}
+		c.broker, err = c.client.leader(c.topic, c.partition)
+		if err != nil {
+			return -1, err
+		}
+		return c.getLatestOffset(false)
+	}
+
+	return -1, block.Err
+}

+ 12 - 0
offset_response.go

@@ -58,3 +58,15 @@ func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
+	if r.Blocks == nil {
+		return nil
+	}
+
+	if r.Blocks[topic] == nil {
+		return nil
+	}
+
+	return r.Blocks[topic][partition]
+}