record_batch.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. const recordBatchOverhead = 49
  7. type recordsArray []*Record
  8. func (e recordsArray) encode(pe packetEncoder) error {
  9. for _, r := range e {
  10. if err := r.encode(pe); err != nil {
  11. return err
  12. }
  13. }
  14. return nil
  15. }
  16. func (e recordsArray) decode(pd packetDecoder) error {
  17. for i := range e {
  18. rec := &Record{}
  19. if err := rec.decode(pd); err != nil {
  20. return err
  21. }
  22. e[i] = rec
  23. }
  24. return nil
  25. }
  26. type RecordBatch struct {
  27. FirstOffset int64
  28. PartitionLeaderEpoch int32
  29. Version int8
  30. Codec CompressionCodec
  31. CompressionLevel int
  32. Control bool
  33. LastOffsetDelta int32
  34. FirstTimestamp time.Time
  35. MaxTimestamp time.Time
  36. ProducerID int64
  37. ProducerEpoch int16
  38. FirstSequence int32
  39. Records []*Record
  40. PartialTrailingRecord bool
  41. compressedRecords []byte
  42. recordsLen int // uncompressed records size
  43. }
  44. func (b *RecordBatch) encode(pe packetEncoder) error {
  45. if b.Version != 2 {
  46. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  47. }
  48. pe.putInt64(b.FirstOffset)
  49. pe.push(&lengthField{})
  50. pe.putInt32(b.PartitionLeaderEpoch)
  51. pe.putInt8(b.Version)
  52. pe.push(newCRC32Field(crcCastagnoli))
  53. pe.putInt16(b.computeAttributes())
  54. pe.putInt32(b.LastOffsetDelta)
  55. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  56. return err
  57. }
  58. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  59. return err
  60. }
  61. pe.putInt64(b.ProducerID)
  62. pe.putInt16(b.ProducerEpoch)
  63. pe.putInt32(b.FirstSequence)
  64. if err := pe.putArrayLength(len(b.Records)); err != nil {
  65. return err
  66. }
  67. if b.compressedRecords == nil {
  68. if err := b.encodeRecords(pe); err != nil {
  69. return err
  70. }
  71. }
  72. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  73. return err
  74. }
  75. if err := pe.pop(); err != nil {
  76. return err
  77. }
  78. return pe.pop()
  79. }
  80. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  81. if b.FirstOffset, err = pd.getInt64(); err != nil {
  82. return err
  83. }
  84. batchLen, err := pd.getInt32()
  85. if err != nil {
  86. return err
  87. }
  88. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  89. return err
  90. }
  91. if b.Version, err = pd.getInt8(); err != nil {
  92. return err
  93. }
  94. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  95. return err
  96. }
  97. attributes, err := pd.getInt16()
  98. if err != nil {
  99. return err
  100. }
  101. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  102. b.Control = attributes&controlMask == controlMask
  103. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  104. return err
  105. }
  106. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  107. return err
  108. }
  109. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  110. return err
  111. }
  112. if b.ProducerID, err = pd.getInt64(); err != nil {
  113. return err
  114. }
  115. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  116. return err
  117. }
  118. if b.FirstSequence, err = pd.getInt32(); err != nil {
  119. return err
  120. }
  121. numRecs, err := pd.getArrayLength()
  122. if err != nil {
  123. return err
  124. }
  125. if numRecs >= 0 {
  126. b.Records = make([]*Record, numRecs)
  127. }
  128. bufSize := int(batchLen) - recordBatchOverhead
  129. recBuffer, err := pd.getRawBytes(bufSize)
  130. if err != nil {
  131. if err == ErrInsufficientData {
  132. b.PartialTrailingRecord = true
  133. b.Records = nil
  134. return nil
  135. }
  136. return err
  137. }
  138. if err = pd.pop(); err != nil {
  139. return err
  140. }
  141. recBuffer, err = decompress(b.Codec, recBuffer)
  142. if err != nil {
  143. return err
  144. }
  145. b.recordsLen = len(recBuffer)
  146. err = decode(recBuffer, recordsArray(b.Records))
  147. if err == ErrInsufficientData {
  148. b.PartialTrailingRecord = true
  149. b.Records = nil
  150. return nil
  151. }
  152. return err
  153. }
  154. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  155. var raw []byte
  156. var err error
  157. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  158. return err
  159. }
  160. b.recordsLen = len(raw)
  161. b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
  162. return err
  163. }
  164. func (b *RecordBatch) computeAttributes() int16 {
  165. attr := int16(b.Codec) & int16(compressionCodecMask)
  166. if b.Control {
  167. attr |= controlMask
  168. }
  169. return attr
  170. }
  171. func (b *RecordBatch) addRecord(r *Record) {
  172. b.Records = append(b.Records, r)
  173. }