Evan Huus 12 лет назад
Родитель
Сommit
0a5f61b3b3
1 измененных файлов с 21 добавлено и 2 удалено
  1. 21 2
      kafka/producer.go

+ 21 - 2
kafka/producer.go

@@ -65,7 +65,20 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	}
 	}
 
 
 	broker, err := p.client.leader(p.topic, partition)
 	broker, err := p.client.leader(p.topic, partition)
-	if err != nil {
+	switch t := err.(type) {
+	case k.KError:
+		if t == k.LEADER_NOT_AVAILABLE {
+			time.Sleep(250 * time.Millisecond) // wait for leader election
+			broker, err = p.client.leader(p.topic, partition)
+			if err != nil {
+				return err
+			}
+		} else {
+			return err
+		}
+	case nil:
+		break
+	default:
 		return err
 		return err
 	}
 	}
 
 
@@ -73,8 +86,14 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 	request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 	request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 
 
 	response, err := broker.Produce(p.client.id, request)
 	response, err := broker.Produce(p.client.id, request)
-	if err != nil {
+	switch err.(type) {
+	case k.EncodingError:
 		return err
 		return err
+	case nil:
+		break
+	default:
+		p.client.disconnectBroker(broker)
+		return p.safeSendMessage(key, value, true)
 	}
 	}
 
 
 	if response == nil {
 	if response == nil {