|
|
@@ -123,6 +123,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
|
|
|
for topic, partitionSet := range ps.msgs {
|
|
|
for partition, set := range partitionSet {
|
|
|
if req.Version >= 3 {
|
|
|
+ // If the API version we're hitting is 3 or greater, we need to calculate
|
|
|
+ // offsets for each record in the batch relative to FirstOffset.
|
|
|
+ // Additionally, we must set LastOffsetDelta to the value of the last offset
|
|
|
+ // in the batch. Since the OffsetDelta of the first record is 0, we know that the
|
|
|
+ // final record of any batch will have an offset of (# of records in batch) - 1.
|
|
|
+ // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
|
|
|
+ // under the RecordBatch section for details.)
|
|
|
rb := set.recordsToSend.RecordBatch
|
|
|
if len(rb.Records) > 0 {
|
|
|
rb.LastOffsetDelta = int32(len(rb.Records) - 1)
|