|
|
@@ -4,10 +4,12 @@ package sarama
|
|
|
type OffsetMethod int
|
|
|
|
|
|
const (
|
|
|
- // When RAW_OFFSET is passed as the OffsetMethod, OffsetValue is interpreted as the raw offset to start at.
|
|
|
- RAW_OFFSET OffsetMethod = iota
|
|
|
- // When LATEST_OFFSET is passed as the OffsetMethod, the broker is queried for the most recent valid offset.
|
|
|
- LATEST_OFFSET OffsetMethod = iota
|
|
|
+ // METHOD_MANUAL causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
|
|
|
+ // offset at which to start, allowing the user to manually specify their desired starting offset.
|
|
|
+ METHOD_MANUAL OffsetMethod = iota
|
|
|
+ // METHOD_NEWEST causes the consumer to start at the most recent available offset, as determined
|
|
|
+ // by querying the broker.
|
|
|
+ METHOD_NEWEST OffsetMethod = iota
|
|
|
)
|
|
|
|
|
|
// ConsumerConfig is used to pass multiple configuration options to NewConsumer.
|
|
|
@@ -23,9 +25,9 @@ type ConsumerConfig struct {
|
|
|
// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
|
|
|
// returns fewer than that anyways. The default of 0 is treated as no limit.
|
|
|
MaxWaitTime int32
|
|
|
- // The method of determining which offset to start at
|
|
|
+ // The method used to determine at which offset to begin consuming messages.
|
|
|
OffsetMethod OffsetMethod
|
|
|
- // Interpreted according to OffsetMethod
|
|
|
+ // Interpreted differently according to the value of OffsetMethod.
|
|
|
OffsetValue int64
|
|
|
}
|
|
|
|
|
|
@@ -95,12 +97,12 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
|
|
|
c.broker = broker
|
|
|
|
|
|
switch config.OffsetMethod {
|
|
|
- case RAW_OFFSET:
|
|
|
+ case METHOD_MANUAL:
|
|
|
if config.OffsetValue < 0 {
|
|
|
- return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is RAW_OFFSET")
|
|
|
+ return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
|
|
|
}
|
|
|
c.offset = config.OffsetValue
|
|
|
- case LATEST_OFFSET:
|
|
|
+ case METHOD_NEWEST:
|
|
|
c.offset, err = c.getLatestOffset(true)
|
|
|
if err != nil {
|
|
|
return nil, err
|