record_batch.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/DataDog/zstd"
  9. "github.com/eapache/go-xerial-snappy"
  10. "github.com/pierrec/lz4"
  11. )
  12. const recordBatchOverhead = 49
  13. type recordsArray []*Record
  14. func (e recordsArray) encode(pe packetEncoder) error {
  15. for _, r := range e {
  16. if err := r.encode(pe); err != nil {
  17. return err
  18. }
  19. }
  20. return nil
  21. }
  22. func (e recordsArray) decode(pd packetDecoder) error {
  23. for i := range e {
  24. rec := &Record{}
  25. if err := rec.decode(pd); err != nil {
  26. return err
  27. }
  28. e[i] = rec
  29. }
  30. return nil
  31. }
  32. type RecordBatch struct {
  33. FirstOffset int64
  34. PartitionLeaderEpoch int32
  35. Version int8
  36. Codec CompressionCodec
  37. CompressionLevel int
  38. Control bool
  39. LastOffsetDelta int32
  40. FirstTimestamp time.Time
  41. MaxTimestamp time.Time
  42. ProducerID int64
  43. ProducerEpoch int16
  44. FirstSequence int32
  45. Records []*Record
  46. PartialTrailingRecord bool
  47. compressedRecords []byte
  48. recordsLen int // uncompressed records size
  49. }
  50. func (b *RecordBatch) encode(pe packetEncoder) error {
  51. if b.Version != 2 {
  52. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  53. }
  54. pe.putInt64(b.FirstOffset)
  55. pe.push(&lengthField{})
  56. pe.putInt32(b.PartitionLeaderEpoch)
  57. pe.putInt8(b.Version)
  58. pe.push(newCRC32Field(crcCastagnoli))
  59. pe.putInt16(b.computeAttributes())
  60. pe.putInt32(b.LastOffsetDelta)
  61. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  62. return err
  63. }
  64. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  65. return err
  66. }
  67. pe.putInt64(b.ProducerID)
  68. pe.putInt16(b.ProducerEpoch)
  69. pe.putInt32(b.FirstSequence)
  70. if err := pe.putArrayLength(len(b.Records)); err != nil {
  71. return err
  72. }
  73. if b.compressedRecords == nil {
  74. if err := b.encodeRecords(pe); err != nil {
  75. return err
  76. }
  77. }
  78. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  79. return err
  80. }
  81. if err := pe.pop(); err != nil {
  82. return err
  83. }
  84. return pe.pop()
  85. }
  86. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  87. if b.FirstOffset, err = pd.getInt64(); err != nil {
  88. return err
  89. }
  90. batchLen, err := pd.getInt32()
  91. if err != nil {
  92. return err
  93. }
  94. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  95. return err
  96. }
  97. if b.Version, err = pd.getInt8(); err != nil {
  98. return err
  99. }
  100. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  101. return err
  102. }
  103. attributes, err := pd.getInt16()
  104. if err != nil {
  105. return err
  106. }
  107. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  108. b.Control = attributes&controlMask == controlMask
  109. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  110. return err
  111. }
  112. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  113. return err
  114. }
  115. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  116. return err
  117. }
  118. if b.ProducerID, err = pd.getInt64(); err != nil {
  119. return err
  120. }
  121. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  122. return err
  123. }
  124. if b.FirstSequence, err = pd.getInt32(); err != nil {
  125. return err
  126. }
  127. numRecs, err := pd.getArrayLength()
  128. if err != nil {
  129. return err
  130. }
  131. if numRecs >= 0 {
  132. b.Records = make([]*Record, numRecs)
  133. }
  134. bufSize := int(batchLen) - recordBatchOverhead
  135. recBuffer, err := pd.getRawBytes(bufSize)
  136. if err != nil {
  137. if err == ErrInsufficientData {
  138. b.PartialTrailingRecord = true
  139. b.Records = nil
  140. return nil
  141. }
  142. return err
  143. }
  144. if err = pd.pop(); err != nil {
  145. return err
  146. }
  147. switch b.Codec {
  148. case CompressionNone:
  149. case CompressionGZIP:
  150. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  151. if err != nil {
  152. return err
  153. }
  154. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  155. return err
  156. }
  157. case CompressionSnappy:
  158. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  159. return err
  160. }
  161. case CompressionLZ4:
  162. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  163. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  164. return err
  165. }
  166. case CompressionZSTD:
  167. reader := zstd.NewReader(bytes.NewReader(recBuffer))
  168. defer reader.Close()
  169. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  170. return err
  171. }
  172. default:
  173. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  174. }
  175. b.recordsLen = len(recBuffer)
  176. err = decode(recBuffer, recordsArray(b.Records))
  177. if err == ErrInsufficientData {
  178. b.PartialTrailingRecord = true
  179. b.Records = nil
  180. return nil
  181. }
  182. return err
  183. }
  184. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  185. var raw []byte
  186. var err error
  187. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  188. return err
  189. }
  190. b.recordsLen = len(raw)
  191. switch b.Codec {
  192. case CompressionNone:
  193. b.compressedRecords = raw
  194. case CompressionGZIP:
  195. var buf bytes.Buffer
  196. var writer *gzip.Writer
  197. if b.CompressionLevel != CompressionLevelDefault {
  198. writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
  199. if err != nil {
  200. return err
  201. }
  202. } else {
  203. writer = gzip.NewWriter(&buf)
  204. }
  205. if _, err := writer.Write(raw); err != nil {
  206. return err
  207. }
  208. if err := writer.Close(); err != nil {
  209. return err
  210. }
  211. b.compressedRecords = buf.Bytes()
  212. case CompressionSnappy:
  213. b.compressedRecords = snappy.Encode(raw)
  214. case CompressionLZ4:
  215. var buf bytes.Buffer
  216. writer := lz4.NewWriter(&buf)
  217. if _, err := writer.Write(raw); err != nil {
  218. return err
  219. }
  220. if err := writer.Close(); err != nil {
  221. return err
  222. }
  223. b.compressedRecords = buf.Bytes()
  224. case CompressionZSTD:
  225. var buf bytes.Buffer
  226. writer := zstd.NewWriterLevel(&buf, b.CompressionLevel)
  227. if _, err = writer.Write(raw); err != nil {
  228. return err
  229. }
  230. if err = writer.Close(); err != nil {
  231. return err
  232. }
  233. b.compressedRecords = buf.Bytes()
  234. default:
  235. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  236. }
  237. return nil
  238. }
  239. func (b *RecordBatch) computeAttributes() int16 {
  240. attr := int16(b.Codec) & int16(compressionCodecMask)
  241. if b.Control {
  242. attr |= controlMask
  243. }
  244. return attr
  245. }
  246. func (b *RecordBatch) addRecord(r *Record) {
  247. b.Records = append(b.Records, r)
  248. }