record_batch.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. const recordBatchOverhead = 49
  7. type recordsArray []*Record
  8. func (e recordsArray) encode(pe packetEncoder) error {
  9. for _, r := range e {
  10. if err := r.encode(pe); err != nil {
  11. return err
  12. }
  13. }
  14. return nil
  15. }
  16. func (e recordsArray) decode(pd packetDecoder) error {
  17. for i := range e {
  18. rec := &Record{}
  19. if err := rec.decode(pd); err != nil {
  20. return err
  21. }
  22. e[i] = rec
  23. }
  24. return nil
  25. }
  26. type RecordBatch struct {
  27. FirstOffset int64
  28. PartitionLeaderEpoch int32
  29. Version int8
  30. Codec CompressionCodec
  31. CompressionLevel int
  32. Control bool
  33. LogAppendTime bool
  34. LastOffsetDelta int32
  35. FirstTimestamp time.Time
  36. MaxTimestamp time.Time
  37. ProducerID int64
  38. ProducerEpoch int16
  39. FirstSequence int32
  40. Records []*Record
  41. PartialTrailingRecord bool
  42. IsTransactional bool
  43. compressedRecords []byte
  44. recordsLen int // uncompressed records size
  45. }
  46. func (b *RecordBatch) LastOffset() int64 {
  47. return b.FirstOffset + int64(b.LastOffsetDelta)
  48. }
  49. func (b *RecordBatch) encode(pe packetEncoder) error {
  50. if b.Version != 2 {
  51. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  52. }
  53. pe.putInt64(b.FirstOffset)
  54. pe.push(&lengthField{})
  55. pe.putInt32(b.PartitionLeaderEpoch)
  56. pe.putInt8(b.Version)
  57. pe.push(newCRC32Field(crcCastagnoli))
  58. pe.putInt16(b.computeAttributes())
  59. pe.putInt32(b.LastOffsetDelta)
  60. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  61. return err
  62. }
  63. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  64. return err
  65. }
  66. pe.putInt64(b.ProducerID)
  67. pe.putInt16(b.ProducerEpoch)
  68. pe.putInt32(b.FirstSequence)
  69. if err := pe.putArrayLength(len(b.Records)); err != nil {
  70. return err
  71. }
  72. if b.compressedRecords == nil {
  73. if err := b.encodeRecords(pe); err != nil {
  74. return err
  75. }
  76. }
  77. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  78. return err
  79. }
  80. if err := pe.pop(); err != nil {
  81. return err
  82. }
  83. return pe.pop()
  84. }
  85. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  86. if b.FirstOffset, err = pd.getInt64(); err != nil {
  87. return err
  88. }
  89. batchLen, err := pd.getInt32()
  90. if err != nil {
  91. return err
  92. }
  93. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  94. return err
  95. }
  96. if b.Version, err = pd.getInt8(); err != nil {
  97. return err
  98. }
  99. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  100. return err
  101. }
  102. attributes, err := pd.getInt16()
  103. if err != nil {
  104. return err
  105. }
  106. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  107. b.Control = attributes&controlMask == controlMask
  108. b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
  109. b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
  110. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  111. return err
  112. }
  113. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  114. return err
  115. }
  116. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  117. return err
  118. }
  119. if b.ProducerID, err = pd.getInt64(); err != nil {
  120. return err
  121. }
  122. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  123. return err
  124. }
  125. if b.FirstSequence, err = pd.getInt32(); err != nil {
  126. return err
  127. }
  128. numRecs, err := pd.getArrayLength()
  129. if err != nil {
  130. return err
  131. }
  132. if numRecs >= 0 {
  133. b.Records = make([]*Record, numRecs)
  134. }
  135. bufSize := int(batchLen) - recordBatchOverhead
  136. recBuffer, err := pd.getRawBytes(bufSize)
  137. if err != nil {
  138. if err == ErrInsufficientData {
  139. b.PartialTrailingRecord = true
  140. b.Records = nil
  141. return nil
  142. }
  143. return err
  144. }
  145. if err = pd.pop(); err != nil {
  146. return err
  147. }
  148. recBuffer, err = decompress(b.Codec, recBuffer)
  149. if err != nil {
  150. return err
  151. }
  152. b.recordsLen = len(recBuffer)
  153. err = decode(recBuffer, recordsArray(b.Records))
  154. if err == ErrInsufficientData {
  155. b.PartialTrailingRecord = true
  156. b.Records = nil
  157. return nil
  158. }
  159. return err
  160. }
  161. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  162. var raw []byte
  163. var err error
  164. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  165. return err
  166. }
  167. b.recordsLen = len(raw)
  168. b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
  169. return err
  170. }
  171. func (b *RecordBatch) computeAttributes() int16 {
  172. attr := int16(b.Codec) & int16(compressionCodecMask)
  173. if b.Control {
  174. attr |= controlMask
  175. }
  176. if b.LogAppendTime {
  177. attr |= timestampTypeMask
  178. }
  179. if b.IsTransactional {
  180. attr |= isTransactionalMask
  181. }
  182. return attr
  183. }
  184. func (b *RecordBatch) addRecord(r *Record) {
  185. b.Records = append(b.Records, r)
  186. }