record_batch.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. const recordBatchOverhead = 49
  7. type recordsArray []*Record
  8. func (e recordsArray) encode(pe packetEncoder) error {
  9. for _, r := range e {
  10. if err := r.encode(pe); err != nil {
  11. return err
  12. }
  13. }
  14. return nil
  15. }
  16. func (e recordsArray) decode(pd packetDecoder) error {
  17. for i := range e {
  18. rec := &Record{}
  19. if err := rec.decode(pd); err != nil {
  20. return err
  21. }
  22. e[i] = rec
  23. }
  24. return nil
  25. }
  26. type RecordBatch struct {
  27. FirstOffset int64
  28. PartitionLeaderEpoch int32
  29. Version int8
  30. Codec CompressionCodec
  31. CompressionLevel int
  32. Control bool
  33. LogAppendTime bool
  34. LastOffsetDelta int32
  35. FirstTimestamp time.Time
  36. MaxTimestamp time.Time
  37. ProducerID int64
  38. ProducerEpoch int16
  39. FirstSequence int32
  40. Records []*Record
  41. PartialTrailingRecord bool
  42. IsTransactional bool
  43. compressedRecords []byte
  44. recordsLen int // uncompressed records size
  45. }
  46. func (b *RecordBatch) LastOffset() int64 {
  47. return b.FirstOffset + int64(b.LastOffsetDelta)
  48. }
  49. func (b *RecordBatch) encode(pe packetEncoder) error {
  50. if b.Version != 2 {
  51. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  52. }
  53. pe.putInt64(b.FirstOffset)
  54. pe.push(&lengthField{})
  55. pe.putInt32(b.PartitionLeaderEpoch)
  56. pe.putInt8(b.Version)
  57. pe.push(newCRC32Field(crcCastagnoli))
  58. pe.putInt16(b.computeAttributes())
  59. pe.putInt32(b.LastOffsetDelta)
  60. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  61. return err
  62. }
  63. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  64. return err
  65. }
  66. pe.putInt64(b.ProducerID)
  67. pe.putInt16(b.ProducerEpoch)
  68. pe.putInt32(b.FirstSequence)
  69. if err := pe.putArrayLength(len(b.Records)); err != nil {
  70. return err
  71. }
  72. if b.compressedRecords == nil {
  73. if err := b.encodeRecords(pe); err != nil {
  74. return err
  75. }
  76. }
  77. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  78. return err
  79. }
  80. if err := pe.pop(); err != nil {
  81. return err
  82. }
  83. return pe.pop()
  84. }
  85. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  86. if b.FirstOffset, err = pd.getInt64(); err != nil {
  87. return err
  88. }
  89. batchLen, err := pd.getInt32()
  90. if err != nil {
  91. return err
  92. }
  93. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  94. return err
  95. }
  96. if b.Version, err = pd.getInt8(); err != nil {
  97. return err
  98. }
  99. crc32Decoder := acquireCrc32Field(crcCastagnoli)
  100. defer releaseCrc32Field(crc32Decoder)
  101. if err = pd.push(crc32Decoder); err != nil {
  102. return err
  103. }
  104. attributes, err := pd.getInt16()
  105. if err != nil {
  106. return err
  107. }
  108. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  109. b.Control = attributes&controlMask == controlMask
  110. b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
  111. b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
  112. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  113. return err
  114. }
  115. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  116. return err
  117. }
  118. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  119. return err
  120. }
  121. if b.ProducerID, err = pd.getInt64(); err != nil {
  122. return err
  123. }
  124. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  125. return err
  126. }
  127. if b.FirstSequence, err = pd.getInt32(); err != nil {
  128. return err
  129. }
  130. numRecs, err := pd.getArrayLength()
  131. if err != nil {
  132. return err
  133. }
  134. if numRecs >= 0 {
  135. b.Records = make([]*Record, numRecs)
  136. }
  137. bufSize := int(batchLen) - recordBatchOverhead
  138. recBuffer, err := pd.getRawBytes(bufSize)
  139. if err != nil {
  140. if err == ErrInsufficientData {
  141. b.PartialTrailingRecord = true
  142. b.Records = nil
  143. return nil
  144. }
  145. return err
  146. }
  147. if err = pd.pop(); err != nil {
  148. return err
  149. }
  150. recBuffer, err = decompress(b.Codec, recBuffer)
  151. if err != nil {
  152. return err
  153. }
  154. b.recordsLen = len(recBuffer)
  155. err = decode(recBuffer, recordsArray(b.Records))
  156. if err == ErrInsufficientData {
  157. b.PartialTrailingRecord = true
  158. b.Records = nil
  159. return nil
  160. }
  161. return err
  162. }
  163. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  164. var raw []byte
  165. var err error
  166. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  167. return err
  168. }
  169. b.recordsLen = len(raw)
  170. b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
  171. return err
  172. }
  173. func (b *RecordBatch) computeAttributes() int16 {
  174. attr := int16(b.Codec) & int16(compressionCodecMask)
  175. if b.Control {
  176. attr |= controlMask
  177. }
  178. if b.LogAppendTime {
  179. attr |= timestampTypeMask
  180. }
  181. if b.IsTransactional {
  182. attr |= isTransactionalMask
  183. }
  184. return attr
  185. }
  186. func (b *RecordBatch) addRecord(r *Record) {
  187. b.Records = append(b.Records, r)
  188. }