|
|
@@ -139,7 +139,6 @@ type MessageToSend struct {
|
|
|
Key, Value Encoder
|
|
|
|
|
|
// these are filled in by the producer as the message is processed
|
|
|
- broker *Broker
|
|
|
offset int64
|
|
|
partition int32
|
|
|
flags flagSet
|
|
|
@@ -312,7 +311,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
if backlog == nil {
|
|
|
// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
- output <- &MessageToSend{Topic: topic, partition: partition, broker: leader, flags: chaser}
|
|
|
+ output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
|
|
|
Logger.Println("Producer dispatching retried messages to new leader.")
|
|
|
backlog = make([]*MessageToSend, 0)
|
|
|
p.unrefBrokerWorker(leader)
|
|
|
@@ -340,7 +339,6 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
|
|
|
for _, msg := range backlog {
|
|
|
- msg.broker = leader
|
|
|
output <- msg
|
|
|
}
|
|
|
Logger.Println("Producer backlog processsed.")
|
|
|
@@ -368,7 +366,6 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
output = p.getBrokerWorker(leader)
|
|
|
}
|
|
|
|
|
|
- msg.broker = leader
|
|
|
output <- msg
|
|
|
}
|
|
|
|
|
|
@@ -659,9 +656,8 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
|
|
|
|
|
|
if empty {
|
|
|
return nil
|
|
|
- } else {
|
|
|
- return req
|
|
|
}
|
|
|
+ return req
|
|
|
}
|
|
|
|
|
|
func (p *Producer) returnMessages(batch []*MessageToSend, err error) {
|