|
|
@@ -50,7 +50,7 @@ type RecordBatch struct {
|
|
|
PartialTrailingRecord bool
|
|
|
|
|
|
compressedRecords []byte
|
|
|
- recordsLen int
|
|
|
+ recordsLen int // uncompressed records size
|
|
|
}
|
|
|
|
|
|
func (b *RecordBatch) encode(pe packetEncoder) error {
|
|
|
@@ -90,12 +90,15 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
|
|
|
if raw, err = encode(recordsArray(b.Records), nil); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ b.recordsLen = len(raw)
|
|
|
}
|
|
|
switch b.Codec {
|
|
|
case CompressionNone:
|
|
|
+ offset := pe.offset()
|
|
|
if err := recordsArray(b.Records).encode(pe); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ b.recordsLen = pe.offset() - offset
|
|
|
case CompressionGZIP:
|
|
|
var buf bytes.Buffer
|
|
|
writer := gzip.NewWriter(&buf)
|
|
|
@@ -242,18 +245,6 @@ func (b *RecordBatch) computeAttributes() int16 {
|
|
|
return attr
|
|
|
}
|
|
|
|
|
|
-func (b *RecordBatch) computeRecordsLength() error {
|
|
|
- b.recordsLen = 0
|
|
|
- for _, r := range b.Records {
|
|
|
- l, err := r.getTotalLength()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- b.recordsLen += l
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
func (b *RecordBatch) addRecord(r *Record) {
|
|
|
b.Records = append(b.Records, r)
|
|
|
}
|