record_batch.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. const recordBatchOverhead = 49
  7. type recordsArray []*Record
  8. func (e recordsArray) encode(pe packetEncoder) error {
  9. for _, r := range e {
  10. if err := r.encode(pe); err != nil {
  11. return err
  12. }
  13. }
  14. return nil
  15. }
  16. func (e recordsArray) decode(pd packetDecoder) error {
  17. for i := range e {
  18. rec := &Record{}
  19. if err := rec.decode(pd); err != nil {
  20. return err
  21. }
  22. e[i] = rec
  23. }
  24. return nil
  25. }
  26. type RecordBatch struct {
  27. FirstOffset int64
  28. PartitionLeaderEpoch int32
  29. Version int8
  30. Codec CompressionCodec
  31. CompressionLevel int
  32. Control bool
  33. LogAppendTime bool
  34. LastOffsetDelta int32
  35. FirstTimestamp time.Time
  36. MaxTimestamp time.Time
  37. ProducerID int64
  38. ProducerEpoch int16
  39. FirstSequence int32
  40. Records []*Record
  41. PartialTrailingRecord bool
  42. compressedRecords []byte
  43. recordsLen int // uncompressed records size
  44. }
  45. func (b *RecordBatch) encode(pe packetEncoder) error {
  46. if b.Version != 2 {
  47. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  48. }
  49. pe.putInt64(b.FirstOffset)
  50. pe.push(&lengthField{})
  51. pe.putInt32(b.PartitionLeaderEpoch)
  52. pe.putInt8(b.Version)
  53. pe.push(newCRC32Field(crcCastagnoli))
  54. pe.putInt16(b.computeAttributes())
  55. pe.putInt32(b.LastOffsetDelta)
  56. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  57. return err
  58. }
  59. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  60. return err
  61. }
  62. pe.putInt64(b.ProducerID)
  63. pe.putInt16(b.ProducerEpoch)
  64. pe.putInt32(b.FirstSequence)
  65. if err := pe.putArrayLength(len(b.Records)); err != nil {
  66. return err
  67. }
  68. if b.compressedRecords == nil {
  69. if err := b.encodeRecords(pe); err != nil {
  70. return err
  71. }
  72. }
  73. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  74. return err
  75. }
  76. if err := pe.pop(); err != nil {
  77. return err
  78. }
  79. return pe.pop()
  80. }
  81. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  82. if b.FirstOffset, err = pd.getInt64(); err != nil {
  83. return err
  84. }
  85. batchLen, err := pd.getInt32()
  86. if err != nil {
  87. return err
  88. }
  89. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  90. return err
  91. }
  92. if b.Version, err = pd.getInt8(); err != nil {
  93. return err
  94. }
  95. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  96. return err
  97. }
  98. attributes, err := pd.getInt16()
  99. if err != nil {
  100. return err
  101. }
  102. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  103. b.Control = attributes&controlMask == controlMask
  104. b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
  105. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  106. return err
  107. }
  108. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  109. return err
  110. }
  111. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  112. return err
  113. }
  114. if b.ProducerID, err = pd.getInt64(); err != nil {
  115. return err
  116. }
  117. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  118. return err
  119. }
  120. if b.FirstSequence, err = pd.getInt32(); err != nil {
  121. return err
  122. }
  123. numRecs, err := pd.getArrayLength()
  124. if err != nil {
  125. return err
  126. }
  127. if numRecs >= 0 {
  128. b.Records = make([]*Record, numRecs)
  129. }
  130. bufSize := int(batchLen) - recordBatchOverhead
  131. recBuffer, err := pd.getRawBytes(bufSize)
  132. if err != nil {
  133. if err == ErrInsufficientData {
  134. b.PartialTrailingRecord = true
  135. b.Records = nil
  136. return nil
  137. }
  138. return err
  139. }
  140. if err = pd.pop(); err != nil {
  141. return err
  142. }
  143. recBuffer, err = decompress(b.Codec, recBuffer)
  144. if err != nil {
  145. return err
  146. }
  147. b.recordsLen = len(recBuffer)
  148. err = decode(recBuffer, recordsArray(b.Records))
  149. if err == ErrInsufficientData {
  150. b.PartialTrailingRecord = true
  151. b.Records = nil
  152. return nil
  153. }
  154. return err
  155. }
  156. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  157. var raw []byte
  158. var err error
  159. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  160. return err
  161. }
  162. b.recordsLen = len(raw)
  163. b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
  164. return err
  165. }
  166. func (b *RecordBatch) computeAttributes() int16 {
  167. attr := int16(b.Codec) & int16(compressionCodecMask)
  168. if b.Control {
  169. attr |= controlMask
  170. }
  171. if b.LogAppendTime {
  172. attr |= timestampTypeMask
  173. }
  174. return attr
  175. }
  176. func (b *RecordBatch) addRecord(r *Record) {
  177. b.Records = append(b.Records, r)
  178. }