|
|
@@ -13,6 +13,8 @@ type ConsumerConfig struct {
|
|
|
// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
|
|
|
// returns fewer than that anyways. The default of 0 is treated as no limit.
|
|
|
MaxWaitTime int32
|
|
|
+ // The offset to start fetching messages from
|
|
|
+ StartingOffset int64
|
|
|
}
|
|
|
|
|
|
// Consumer processes Kafka messages from a given topic and partition.
|
|
|
@@ -60,6 +62,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
|
|
|
return nil, ConfigurationError("Invalid MaxWaitTime")
|
|
|
}
|
|
|
|
|
|
+ if config.StartingOffset < 0 {
|
|
|
+ return nil, ConfigurationError("Invalid StartingOffset")
|
|
|
+ }
|
|
|
+
|
|
|
broker, err := client.leader(topic, partition)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -71,10 +77,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
|
|
|
c.partition = partition
|
|
|
c.group = group
|
|
|
c.config = *config
|
|
|
-
|
|
|
- // We should really be sending an OffsetFetchRequest, but that doesn't seem to
|
|
|
- // work in kafka yet. Hopefully will in beta 2...
|
|
|
- c.offset = 0
|
|
|
+ c.offset = config.StartingOffset
|
|
|
c.broker = broker
|
|
|
c.stopper = make(chan bool)
|
|
|
c.done = make(chan bool)
|