@@ -132,10 +132,16 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
// and sent as the payload of a single fake "message" with the appropriate codec
// set and no key. When the server sees a message with a compression codec, it
// decompresses the payload and treats the result as its message set.
- for i, msg := range set.recordsToSend.msgSet.Messages {
+
- // Assign relative offsets to the inner messages. This lets
+ if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
- // the broker avoid recompressing the message set.
+ // If our version is 0.10 or later, assign relative offsets
- msg.Offset = int64(i)
+ // to the inner messages. This lets the broker avoid
+ // recompressing the message set.
+ // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
+ // for details on relative offsets.)
+ for i, msg := range set.recordsToSend.msgSet.Messages {
+ msg.Offset = int64(i)
+ }
}
payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
if err != nil {