package sarama import ( "fmt" "time" ) const recordBatchOverhead = 49 type recordsArray []*Record func (e recordsArray) encode(pe packetEncoder) error { for _, r := range e { if err := r.encode(pe); err != nil { return err } } return nil } func (e recordsArray) decode(pd packetDecoder) error { for i := range e { rec := &Record{} if err := rec.decode(pd); err != nil { return err } e[i] = rec } return nil } type RecordBatch struct { FirstOffset int64 PartitionLeaderEpoch int32 Version int8 Codec CompressionCodec CompressionLevel int Control bool LogAppendTime bool LastOffsetDelta int32 FirstTimestamp time.Time MaxTimestamp time.Time ProducerID int64 ProducerEpoch int16 FirstSequence int32 Records []*Record PartialTrailingRecord bool IsTransactional bool compressedRecords []byte recordsLen int // uncompressed records size } func (b *RecordBatch) LastOffset() int64 { return b.FirstOffset + int64(b.LastOffsetDelta) } func (b *RecordBatch) encode(pe packetEncoder) error { if b.Version != 2 { return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} } pe.putInt64(b.FirstOffset) pe.push(&lengthField{}) pe.putInt32(b.PartitionLeaderEpoch) pe.putInt8(b.Version) pe.push(newCRC32Field(crcCastagnoli)) pe.putInt16(b.computeAttributes()) pe.putInt32(b.LastOffsetDelta) if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil { return err } if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil { return err } pe.putInt64(b.ProducerID) pe.putInt16(b.ProducerEpoch) pe.putInt32(b.FirstSequence) if err := pe.putArrayLength(len(b.Records)); err != nil { return err } if b.compressedRecords == nil { if err := b.encodeRecords(pe); err != nil { return err } } if err := pe.putRawBytes(b.compressedRecords); err != nil { return err } if err := pe.pop(); err != nil { return err } return pe.pop() } func (b *RecordBatch) decode(pd packetDecoder) (err error) { if b.FirstOffset, err = pd.getInt64(); err != nil { return err } batchLen, err := pd.getInt32() if err != nil { return err } if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil { return err } if b.Version, err = pd.getInt8(); err != nil { return err } crc32Decoder := acquireCrc32Field(crcCastagnoli) defer releaseCrc32Field(crc32Decoder) if err = pd.push(crc32Decoder); err != nil { return err } attributes, err := pd.getInt16() if err != nil { return err } b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask) b.Control = attributes&controlMask == controlMask b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask if b.LastOffsetDelta, err = pd.getInt32(); err != nil { return err } if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil { return err } if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil { return err } if b.ProducerID, err = pd.getInt64(); err != nil { return err } if b.ProducerEpoch, err = pd.getInt16(); err != nil { return err } if b.FirstSequence, err = pd.getInt32(); err != nil { return err } numRecs, err := pd.getArrayLength() if err != nil { return err } if numRecs >= 0 { b.Records = make([]*Record, numRecs) } bufSize := int(batchLen) - recordBatchOverhead recBuffer, err := pd.getRawBytes(bufSize) if err != nil { if err == ErrInsufficientData { b.PartialTrailingRecord = true b.Records = nil return nil } return err } if err = pd.pop(); err != nil { return err } recBuffer, err = decompress(b.Codec, recBuffer) if err != nil { return err } b.recordsLen = len(recBuffer) err = decode(recBuffer, recordsArray(b.Records)) if err == ErrInsufficientData { b.PartialTrailingRecord = true b.Records = nil return nil } return err } func (b *RecordBatch) encodeRecords(pe packetEncoder) error { var raw []byte var err error if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil { return err } b.recordsLen = len(raw) b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw) return err } func (b *RecordBatch) computeAttributes() int16 { attr := int16(b.Codec) & int16(compressionCodecMask) if b.Control { attr |= controlMask } if b.LogAppendTime { attr |= timestampTypeMask } if b.IsTransactional { attr |= isTransactionalMask } return attr } func (b *RecordBatch) addRecord(r *Record) { b.Records = append(b.Records, r) }