When we get a new leader, check it is connected
@@ -367,6 +367,10 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
return err
}
+ if _, err = leader.Connected(); err != nil {
+ return err
+ }
+
output = p.getBrokerWorker(leader)
return nil