|
@@ -194,7 +194,7 @@ func (c *Consumer) fetchMessages() {
|
|
|
}
|
|
}
|
|
|
default:
|
|
default:
|
|
|
c.client.disconnectBroker(c.broker)
|
|
c.client.disconnectBroker(c.broker)
|
|
|
- for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
|
|
|
|
|
|
|
+ for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
|
|
|
if !c.sendError(err) {
|
|
if !c.sendError(err) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
@@ -217,7 +217,7 @@ func (c *Consumer) fetchMessages() {
|
|
|
case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
|
|
case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
|
|
|
err = c.client.RefreshTopicMetadata(c.topic)
|
|
err = c.client.RefreshTopicMetadata(c.topic)
|
|
|
if c.sendError(err) {
|
|
if c.sendError(err) {
|
|
|
- for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
|
|
|
|
|
|
|
+ for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
|
|
|
if !c.sendError(err) {
|
|
if !c.sendError(err) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|