瀏覽代碼

Document recordbatch offset changes

Ken Schneider 7 年之前
父節點
當前提交
97f65c7894
共有 1 個文件被更改,包括 7 次插入0 次删除
  1. 7 0
      produce_set.go

+ 7 - 0
produce_set.go

@@ -123,6 +123,13 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {
 			if req.Version >= 3 {
+				// If the API version we're hitting is 3 or greater, we need to calculate
+				// offsets for each record in the batch relative to FirstOffset.
+				// Additionally, we must set LastOffsetDelta to the value of the last offset
+				// in the batch. Since the OffsetDelta of the first record is 0, we know that the
+				// final record of any batch will have an offset of (# of records in batch) - 1.
+				// (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
+				//  under the RecordBatch section for details.)
 				rb := set.recordsToSend.RecordBatch
 				if len(rb.Records) > 0 {
 					rb.LastOffsetDelta = int32(len(rb.Records) - 1)