record_batch.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/DataDog/zstd"
  9. "github.com/eapache/go-xerial-snappy"
  10. "github.com/pierrec/lz4"
  11. )
  12. const recordBatchOverhead = 49
  13. type recordsArray []*Record
  14. func (e recordsArray) encode(pe packetEncoder) error {
  15. for _, r := range e {
  16. if err := r.encode(pe); err != nil {
  17. return err
  18. }
  19. }
  20. return nil
  21. }
  22. func (e recordsArray) decode(pd packetDecoder) error {
  23. for i := range e {
  24. rec := &Record{}
  25. if err := rec.decode(pd); err != nil {
  26. return err
  27. }
  28. e[i] = rec
  29. }
  30. return nil
  31. }
  32. type RecordBatch struct {
  33. FirstOffset int64
  34. PartitionLeaderEpoch int32
  35. Version int8
  36. Codec CompressionCodec
  37. CompressionLevel int
  38. Control bool
  39. LastOffsetDelta int32
  40. FirstTimestamp time.Time
  41. MaxTimestamp time.Time
  42. ProducerID int64
  43. ProducerEpoch int16
  44. FirstSequence int32
  45. Records []*Record
  46. PartialTrailingRecord bool
  47. compressedRecords []byte
  48. recordsLen int // uncompressed records size
  49. }
  50. func (b *RecordBatch) encode(pe packetEncoder) error {
  51. if b.Version != 2 {
  52. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  53. }
  54. pe.putInt64(b.FirstOffset)
  55. pe.push(&lengthField{})
  56. pe.putInt32(b.PartitionLeaderEpoch)
  57. pe.putInt8(b.Version)
  58. pe.push(newCRC32Field(crcCastagnoli))
  59. pe.putInt16(b.computeAttributes())
  60. pe.putInt32(b.LastOffsetDelta)
  61. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  62. return err
  63. }
  64. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  65. return err
  66. }
  67. pe.putInt64(b.ProducerID)
  68. pe.putInt16(b.ProducerEpoch)
  69. pe.putInt32(b.FirstSequence)
  70. if err := pe.putArrayLength(len(b.Records)); err != nil {
  71. return err
  72. }
  73. if b.compressedRecords == nil {
  74. if err := b.encodeRecords(pe); err != nil {
  75. return err
  76. }
  77. }
  78. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  79. return err
  80. }
  81. if err := pe.pop(); err != nil {
  82. return err
  83. }
  84. return pe.pop()
  85. }
  86. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  87. if b.FirstOffset, err = pd.getInt64(); err != nil {
  88. return err
  89. }
  90. batchLen, err := pd.getInt32()
  91. if err != nil {
  92. return err
  93. }
  94. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  95. return err
  96. }
  97. if b.Version, err = pd.getInt8(); err != nil {
  98. return err
  99. }
  100. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  101. return err
  102. }
  103. attributes, err := pd.getInt16()
  104. if err != nil {
  105. return err
  106. }
  107. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  108. b.Control = attributes&controlMask == controlMask
  109. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  110. return err
  111. }
  112. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  113. return err
  114. }
  115. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  116. return err
  117. }
  118. if b.ProducerID, err = pd.getInt64(); err != nil {
  119. return err
  120. }
  121. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  122. return err
  123. }
  124. if b.FirstSequence, err = pd.getInt32(); err != nil {
  125. return err
  126. }
  127. numRecs, err := pd.getArrayLength()
  128. if err != nil {
  129. return err
  130. }
  131. if numRecs >= 0 {
  132. b.Records = make([]*Record, numRecs)
  133. }
  134. bufSize := int(batchLen) - recordBatchOverhead
  135. recBuffer, err := pd.getRawBytes(bufSize)
  136. if err != nil {
  137. if err == ErrInsufficientData {
  138. b.PartialTrailingRecord = true
  139. b.Records = nil
  140. return nil
  141. }
  142. return err
  143. }
  144. if err = pd.pop(); err != nil {
  145. return err
  146. }
  147. switch b.Codec {
  148. case CompressionNone:
  149. case CompressionGZIP:
  150. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  151. if err != nil {
  152. return err
  153. }
  154. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  155. return err
  156. }
  157. case CompressionSnappy:
  158. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  159. return err
  160. }
  161. case CompressionLZ4:
  162. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  163. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  164. return err
  165. }
  166. case CompressionZSTD:
  167. if recBuffer, err = zstd.Decompress(nil, recBuffer); err != nil {
  168. return err
  169. }
  170. default:
  171. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  172. }
  173. b.recordsLen = len(recBuffer)
  174. err = decode(recBuffer, recordsArray(b.Records))
  175. if err == ErrInsufficientData {
  176. b.PartialTrailingRecord = true
  177. b.Records = nil
  178. return nil
  179. }
  180. return err
  181. }
  182. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  183. var raw []byte
  184. var err error
  185. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  186. return err
  187. }
  188. b.recordsLen = len(raw)
  189. switch b.Codec {
  190. case CompressionNone:
  191. b.compressedRecords = raw
  192. case CompressionGZIP:
  193. var buf bytes.Buffer
  194. var writer *gzip.Writer
  195. if b.CompressionLevel != CompressionLevelDefault {
  196. writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
  197. if err != nil {
  198. return err
  199. }
  200. } else {
  201. writer = gzip.NewWriter(&buf)
  202. }
  203. if _, err := writer.Write(raw); err != nil {
  204. return err
  205. }
  206. if err := writer.Close(); err != nil {
  207. return err
  208. }
  209. b.compressedRecords = buf.Bytes()
  210. case CompressionSnappy:
  211. b.compressedRecords = snappy.Encode(raw)
  212. case CompressionLZ4:
  213. var buf bytes.Buffer
  214. writer := lz4.NewWriter(&buf)
  215. if _, err := writer.Write(raw); err != nil {
  216. return err
  217. }
  218. if err := writer.Close(); err != nil {
  219. return err
  220. }
  221. b.compressedRecords = buf.Bytes()
  222. case CompressionZSTD:
  223. c, err := zstd.CompressLevel(nil, raw, b.CompressionLevel)
  224. if err != nil {
  225. return err
  226. }
  227. b.compressedRecords = c
  228. default:
  229. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  230. }
  231. return nil
  232. }
  233. func (b *RecordBatch) computeAttributes() int16 {
  234. attr := int16(b.Codec) & int16(compressionCodecMask)
  235. if b.Control {
  236. attr |= controlMask
  237. }
  238. return attr
  239. }
  240. func (b *RecordBatch) addRecord(r *Record) {
  241. b.Records = append(b.Records, r)
  242. }