|
|
@@ -52,7 +52,12 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
|
|
|
}
|
|
|
|
|
|
set.msgs = append(set.msgs, msg)
|
|
|
- set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val})
|
|
|
+ msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
|
|
|
+ if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) && !msg.Timestamp.IsZero() {
|
|
|
+ msgToSend.Timestamp = msg.Timestamp
|
|
|
+ msgToSend.Version = 1
|
|
|
+ }
|
|
|
+ set.setToSend.addMessage(msgToSend)
|
|
|
|
|
|
size := producerMessageOverhead + len(key) + len(val)
|
|
|
set.bufferBytes += size
|