|
|
@@ -269,10 +269,8 @@ func (c *Consumer) fetchMessages() {
|
|
|
}
|
|
|
|
|
|
func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
|
|
|
- request := &OffsetRequest{}
|
|
|
- request.AddBlock(c.topic, c.partition, where, 1)
|
|
|
+ offset, err := c.client.GetOffset(c.topic, c.partition, where)
|
|
|
|
|
|
- response, err := c.broker.GetAvailableOffsets(c.client.id, request)
|
|
|
switch err {
|
|
|
case nil:
|
|
|
break
|
|
|
@@ -282,42 +280,30 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
|
|
|
if !retry {
|
|
|
return -1, err
|
|
|
}
|
|
|
- Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
|
|
|
- c.client.disconnectBroker(c.broker)
|
|
|
- c.broker, err = c.client.Leader(c.topic, c.partition)
|
|
|
- if err != nil {
|
|
|
- return -1, err
|
|
|
- }
|
|
|
- return c.getOffset(where, false)
|
|
|
- }
|
|
|
|
|
|
- block := response.GetBlock(c.topic, c.partition)
|
|
|
- if block == nil {
|
|
|
- return -1, IncompleteResponse
|
|
|
- }
|
|
|
+ switch err.(type) {
|
|
|
+ case KError:
|
|
|
+ switch err {
|
|
|
+ case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
+ err = c.client.RefreshTopicMetadata(c.topic)
|
|
|
+ if err != nil {
|
|
|
+ return -1, err
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
|
|
|
+ c.client.disconnectBroker(c.broker)
|
|
|
|
|
|
- switch block.Err {
|
|
|
- case NoError:
|
|
|
- if len(block.Offsets) < 1 {
|
|
|
- return -1, IncompleteResponse
|
|
|
- }
|
|
|
- return block.Offsets[0], nil
|
|
|
- case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
- if !retry {
|
|
|
- return -1, block.Err
|
|
|
- }
|
|
|
- err = c.client.RefreshTopicMetadata(c.topic)
|
|
|
- if err != nil {
|
|
|
- return -1, err
|
|
|
- }
|
|
|
- c.broker, err = c.client.Leader(c.topic, c.partition)
|
|
|
- if err != nil {
|
|
|
- return -1, err
|
|
|
+ broker, brokerErr := c.client.Leader(c.topic, c.partition)
|
|
|
+ if brokerErr != nil {
|
|
|
+ return -1, brokerErr
|
|
|
+ }
|
|
|
+ c.broker = broker
|
|
|
+ }
|
|
|
+ return c.getOffset(where, false)
|
|
|
}
|
|
|
- return c.getOffset(where, false)
|
|
|
+ return -1, err
|
|
|
}
|
|
|
-
|
|
|
- return -1, block.Err
|
|
|
+ return offset, nil
|
|
|
}
|
|
|
|
|
|
// Creates a ConsumerConfig instance with sane defaults.
|