Browse Source

Refresh the broker after refreshing the topic

Evan Huus 12 years ago
parent
commit
0f657905ea
1 changed files with 5 additions and 0 deletions
  1. 5 0
      kafka/consumer.go

+ 5 - 0
kafka/consumer.go

@@ -132,6 +132,11 @@ func (c *Consumer) fetchMessages() {
 		case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE:
 			err = c.client.refreshTopic(c.topic)
 			if c.sendError(err) {
+				for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
+					if !c.sendError(err) {
+						return
+					}
+				}
 				continue
 			} else {
 				return