If we don't do this we'll foist it off on the flusher, which is legit but means we waste our retry. We can fail fast here and the resulting behaviour will be better.
@@ -367,6 +367,10 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
return err
}
+ if _, err = leader.Connected(); err != nil {
+ return err
+ }
+
output = p.getBrokerWorker(leader)
return nil