123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- package sarama
- import (
- "fmt"
- "time"
- )
- const recordBatchOverhead = 49
- type recordsArray []*Record
- func (e recordsArray) encode(pe packetEncoder) error {
- for _, r := range e {
- if err := r.encode(pe); err != nil {
- return err
- }
- }
- return nil
- }
- func (e recordsArray) decode(pd packetDecoder) error {
- for i := range e {
- rec := &Record{}
- if err := rec.decode(pd); err != nil {
- return err
- }
- e[i] = rec
- }
- return nil
- }
- type RecordBatch struct {
- FirstOffset int64
- PartitionLeaderEpoch int32
- Version int8
- Codec CompressionCodec
- CompressionLevel int
- Control bool
- LogAppendTime bool
- LastOffsetDelta int32
- FirstTimestamp time.Time
- MaxTimestamp time.Time
- ProducerID int64
- ProducerEpoch int16
- FirstSequence int32
- Records []*Record
- PartialTrailingRecord bool
- IsTransactional bool
- compressedRecords []byte
- recordsLen int // uncompressed records size
- }
- func (b *RecordBatch) LastOffset() int64 {
- return b.FirstOffset + int64(b.LastOffsetDelta)
- }
- func (b *RecordBatch) encode(pe packetEncoder) error {
- if b.Version != 2 {
- return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
- }
- pe.putInt64(b.FirstOffset)
- pe.push(&lengthField{})
- pe.putInt32(b.PartitionLeaderEpoch)
- pe.putInt8(b.Version)
- pe.push(newCRC32Field(crcCastagnoli))
- pe.putInt16(b.computeAttributes())
- pe.putInt32(b.LastOffsetDelta)
- if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
- return err
- }
- if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
- return err
- }
- pe.putInt64(b.ProducerID)
- pe.putInt16(b.ProducerEpoch)
- pe.putInt32(b.FirstSequence)
- if err := pe.putArrayLength(len(b.Records)); err != nil {
- return err
- }
- if b.compressedRecords == nil {
- if err := b.encodeRecords(pe); err != nil {
- return err
- }
- }
- if err := pe.putRawBytes(b.compressedRecords); err != nil {
- return err
- }
- if err := pe.pop(); err != nil {
- return err
- }
- return pe.pop()
- }
- func (b *RecordBatch) decode(pd packetDecoder) (err error) {
- if b.FirstOffset, err = pd.getInt64(); err != nil {
- return err
- }
- batchLen, err := pd.getInt32()
- if err != nil {
- return err
- }
- if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
- return err
- }
- if b.Version, err = pd.getInt8(); err != nil {
- return err
- }
- crc32Decoder := acquireCrc32Field(crcCastagnoli)
- defer releaseCrc32Field(crc32Decoder)
- if err = pd.push(crc32Decoder); err != nil {
- return err
- }
- attributes, err := pd.getInt16()
- if err != nil {
- return err
- }
- b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
- b.Control = attributes&controlMask == controlMask
- b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask
- b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
- if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
- return err
- }
- if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
- return err
- }
- if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
- return err
- }
- if b.ProducerID, err = pd.getInt64(); err != nil {
- return err
- }
- if b.ProducerEpoch, err = pd.getInt16(); err != nil {
- return err
- }
- if b.FirstSequence, err = pd.getInt32(); err != nil {
- return err
- }
- numRecs, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- if numRecs >= 0 {
- b.Records = make([]*Record, numRecs)
- }
- bufSize := int(batchLen) - recordBatchOverhead
- recBuffer, err := pd.getRawBytes(bufSize)
- if err != nil {
- if err == ErrInsufficientData {
- b.PartialTrailingRecord = true
- b.Records = nil
- return nil
- }
- return err
- }
- if err = pd.pop(); err != nil {
- return err
- }
- recBuffer, err = decompress(b.Codec, recBuffer)
- if err != nil {
- return err
- }
- b.recordsLen = len(recBuffer)
- err = decode(recBuffer, recordsArray(b.Records))
- if err == ErrInsufficientData {
- b.PartialTrailingRecord = true
- b.Records = nil
- return nil
- }
- return err
- }
- func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
- var raw []byte
- var err error
- if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
- return err
- }
- b.recordsLen = len(raw)
- b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
- return err
- }
- func (b *RecordBatch) computeAttributes() int16 {
- attr := int16(b.Codec) & int16(compressionCodecMask)
- if b.Control {
- attr |= controlMask
- }
- if b.LogAppendTime {
- attr |= timestampTypeMask
- }
- if b.IsTransactional {
- attr |= isTransactionalMask
- }
- return attr
- }
- func (b *RecordBatch) addRecord(r *Record) {
- b.Records = append(b.Records, r)
- }
|