|
@@ -1,6 +1,7 @@
|
|
|
package kafka
|
|
package kafka
|
|
|
|
|
|
|
|
import k "sarama/protocol"
|
|
import k "sarama/protocol"
|
|
|
|
|
+import "time"
|
|
|
|
|
|
|
|
type Producer struct {
|
|
type Producer struct {
|
|
|
client *Client
|
|
client *Client
|
|
@@ -79,6 +80,17 @@ func (p *Producer) safeSendMessage(key, value Encoder, retries int) error {
|
|
|
switch block.Err {
|
|
switch block.Err {
|
|
|
case k.NO_ERROR:
|
|
case k.NO_ERROR:
|
|
|
return nil
|
|
return nil
|
|
|
|
|
+ case k.LEADER_NOT_AVAILABLE:
|
|
|
|
|
+ if retries <= 0 {
|
|
|
|
|
+ return block.Err
|
|
|
|
|
+ }
|
|
|
|
|
+ // wait for leader election to finish
|
|
|
|
|
+ time.Sleep(250 * time.Millisecond)
|
|
|
|
|
+ err = p.client.cache.refreshTopic(p.topic)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ return p.safeSendMessage(key, value, retries-1)
|
|
|
case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION:
|
|
case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION:
|
|
|
if retries <= 0 {
|
|
if retries <= 0 {
|
|
|
return block.Err
|
|
return block.Err
|