Просмотр исходного кода

Retry timed-out messages in the producer.

Per
https://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAHBV8WfeCqFqPsZVLbyaCNohLpDBXdZp_U6YnHaQoX20jcCG3g%40mail.gmail.com%3E
this is the upstream decision and I think it makes sense.
Evan Huus 11 лет назад
Родитель
Сommit
87dc588467
1 измененных файлов с 1 добавлено и 1 удалено
  1. 1 1
      producer.go

+ 1 - 1
producer.go

@@ -590,7 +590,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 						}
 						p.returnSuccesses(msgs)
 					}
-				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable, RequestTimedOut:
 					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
 						broker.ID(), topic, partition, block.Err)
 					if currentRetries[topic] == nil {