|
@@ -208,21 +208,15 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
|
|
|
|
|
|
|
|
func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
|
|
func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
|
|
|
var raw []byte
|
|
var raw []byte
|
|
|
- if b.Codec != CompressionNone {
|
|
|
|
|
- var err error
|
|
|
|
|
- if raw, err = encode(recordsArray(b.Records), nil); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- b.recordsLen = len(raw)
|
|
|
|
|
|
|
+ var err error
|
|
|
|
|
+ if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
+ b.recordsLen = len(raw)
|
|
|
|
|
|
|
|
switch b.Codec {
|
|
switch b.Codec {
|
|
|
case CompressionNone:
|
|
case CompressionNone:
|
|
|
- offset := pe.offset()
|
|
|
|
|
- if err := recordsArray(b.Records).encode(pe); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- b.recordsLen = pe.offset() - offset
|
|
|
|
|
|
|
+ b.compressedRecords = raw
|
|
|
case CompressionGZIP:
|
|
case CompressionGZIP:
|
|
|
var buf bytes.Buffer
|
|
var buf bytes.Buffer
|
|
|
writer := gzip.NewWriter(&buf)
|
|
writer := gzip.NewWriter(&buf)
|