|
|
@@ -136,6 +136,11 @@ func (m *ProducerMessage) byteSize() int {
|
|
|
return size
|
|
|
}
|
|
|
|
|
|
+func (m *ProducerMessage) clear() {
|
|
|
+ m.flags = 0
|
|
|
+ m.retries = 0
|
|
|
+}
|
|
|
+
|
|
|
// ProducerError is the type of error generated when the producer fails to deliver a message.
|
|
|
// It contains the original ProducerMessage as well as the actual error value.
|
|
|
type ProducerError struct {
|
|
|
@@ -718,8 +723,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
|
|
|
}
|
|
|
|
|
|
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
|
|
|
- msg.flags = 0
|
|
|
- msg.retries = 0
|
|
|
+ msg.clear()
|
|
|
pErr := &ProducerError{Msg: msg, Err: err}
|
|
|
if p.conf.Producer.Return.Errors {
|
|
|
p.errors <- pErr
|
|
|
@@ -743,7 +747,7 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
continue
|
|
|
}
|
|
|
if p.conf.Producer.Return.Successes {
|
|
|
- msg.flags = 0
|
|
|
+ msg.clear()
|
|
|
p.successes <- msg
|
|
|
}
|
|
|
p.inFlight.Done()
|