123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package sarama
- import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io/ioutil"
- "time"
- "github.com/eapache/go-xerial-snappy"
- "github.com/pierrec/lz4"
- )
- const recordBatchOverhead = 49
- type recordsArray []*Record
- func (e recordsArray) encode(pe packetEncoder) error {
- for _, r := range e {
- if err := r.encode(pe); err != nil {
- return err
- }
- }
- return nil
- }
- func (e recordsArray) decode(pd packetDecoder) error {
- for i := range e {
- rec := &Record{}
- if err := rec.decode(pd); err != nil {
- return err
- }
- e[i] = rec
- }
- return nil
- }
- type RecordBatch struct {
- FirstOffset int64
- PartitionLeaderEpoch int32
- Version int8
- Codec CompressionCodec
- CompressionLevel int
- Control bool
- LastOffsetDelta int32
- FirstTimestamp time.Time
- MaxTimestamp time.Time
- ProducerID int64
- ProducerEpoch int16
- FirstSequence int32
- Records []*Record
- PartialTrailingRecord bool
- compressedRecords []byte
- recordsLen int // uncompressed records size
- }
- func (b *RecordBatch) encode(pe packetEncoder) error {
- if b.Version != 2 {
- return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
- }
- pe.putInt64(b.FirstOffset)
- pe.push(&lengthField{})
- pe.putInt32(b.PartitionLeaderEpoch)
- pe.putInt8(b.Version)
- pe.push(newCRC32Field(crcCastagnoli))
- pe.putInt16(b.computeAttributes())
- pe.putInt32(b.LastOffsetDelta)
- if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
- return err
- }
- if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
- return err
- }
- pe.putInt64(b.ProducerID)
- pe.putInt16(b.ProducerEpoch)
- pe.putInt32(b.FirstSequence)
- if err := pe.putArrayLength(len(b.Records)); err != nil {
- return err
- }
- if b.compressedRecords == nil {
- if err := b.encodeRecords(pe); err != nil {
- return err
- }
- }
- if err := pe.putRawBytes(b.compressedRecords); err != nil {
- return err
- }
- if err := pe.pop(); err != nil {
- return err
- }
- return pe.pop()
- }
- func (b *RecordBatch) decode(pd packetDecoder) (err error) {
- if b.FirstOffset, err = pd.getInt64(); err != nil {
- return err
- }
- batchLen, err := pd.getInt32()
- if err != nil {
- return err
- }
- if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
- return err
- }
- if b.Version, err = pd.getInt8(); err != nil {
- return err
- }
- if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
- return err
- }
- attributes, err := pd.getInt16()
- if err != nil {
- return err
- }
- b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
- b.Control = attributes&controlMask == controlMask
- if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
- return err
- }
- if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
- return err
- }
- if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
- return err
- }
- if b.ProducerID, err = pd.getInt64(); err != nil {
- return err
- }
- if b.ProducerEpoch, err = pd.getInt16(); err != nil {
- return err
- }
- if b.FirstSequence, err = pd.getInt32(); err != nil {
- return err
- }
- numRecs, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- if numRecs >= 0 {
- b.Records = make([]*Record, numRecs)
- }
- bufSize := int(batchLen) - recordBatchOverhead
- recBuffer, err := pd.getRawBytes(bufSize)
- if err != nil {
- if err == ErrInsufficientData {
- b.PartialTrailingRecord = true
- b.Records = nil
- return nil
- }
- return err
- }
- if err = pd.pop(); err != nil {
- return err
- }
- switch b.Codec {
- case CompressionNone:
- case CompressionGZIP:
- reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
- if err != nil {
- return err
- }
- if recBuffer, err = ioutil.ReadAll(reader); err != nil {
- return err
- }
- case CompressionSnappy:
- if recBuffer, err = snappy.Decode(recBuffer); err != nil {
- return err
- }
- case CompressionLZ4:
- reader := lz4.NewReader(bytes.NewReader(recBuffer))
- if recBuffer, err = ioutil.ReadAll(reader); err != nil {
- return err
- }
- default:
- return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
- }
- b.recordsLen = len(recBuffer)
- err = decode(recBuffer, recordsArray(b.Records))
- if err == ErrInsufficientData {
- b.PartialTrailingRecord = true
- b.Records = nil
- return nil
- }
- return err
- }
- func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
- var raw []byte
- var err error
- if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
- return err
- }
- b.recordsLen = len(raw)
- switch b.Codec {
- case CompressionNone:
- b.compressedRecords = raw
- case CompressionGZIP:
- var buf bytes.Buffer
- var writer *gzip.Writer
- if b.CompressionLevel != CompressionLevelDefault {
- writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
- if err != nil {
- return err
- }
- } else {
- writer = gzip.NewWriter(&buf)
- }
- if _, err := writer.Write(raw); err != nil {
- return err
- }
- if err := writer.Close(); err != nil {
- return err
- }
- b.compressedRecords = buf.Bytes()
- case CompressionSnappy:
- b.compressedRecords = snappy.Encode(raw)
- case CompressionLZ4:
- var buf bytes.Buffer
- writer := lz4.NewWriter(&buf)
- if _, err := writer.Write(raw); err != nil {
- return err
- }
- if err := writer.Close(); err != nil {
- return err
- }
- b.compressedRecords = buf.Bytes()
- default:
- return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
- }
- return nil
- }
- func (b *RecordBatch) computeAttributes() int16 {
- attr := int16(b.Codec) & int16(compressionCodecMask)
- if b.Control {
- attr |= controlMask
- }
- return attr
- }
- func (b *RecordBatch) addRecord(r *Record) {
- b.Records = append(b.Records, r)
- }
|