record_batch.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. const recordBatchOverhead = 49
  12. type recordsArray []*Record
  13. func (e recordsArray) encode(pe packetEncoder) error {
  14. for _, r := range e {
  15. if err := r.encode(pe); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }
  21. func (e recordsArray) decode(pd packetDecoder) error {
  22. for i := range e {
  23. rec := &Record{}
  24. if err := rec.decode(pd); err != nil {
  25. return err
  26. }
  27. e[i] = rec
  28. }
  29. return nil
  30. }
  31. type RecordBatch struct {
  32. FirstOffset int64
  33. PartitionLeaderEpoch int32
  34. Version int8
  35. Codec CompressionCodec
  36. CompressionLevel int
  37. Control bool
  38. LastOffsetDelta int32
  39. FirstTimestamp time.Time
  40. MaxTimestamp time.Time
  41. ProducerID int64
  42. ProducerEpoch int16
  43. FirstSequence int32
  44. Records []*Record
  45. PartialTrailingRecord bool
  46. compressedRecords []byte
  47. recordsLen int // uncompressed records size
  48. }
  49. func (b *RecordBatch) encode(pe packetEncoder) error {
  50. if b.Version != 2 {
  51. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  52. }
  53. pe.putInt64(b.FirstOffset)
  54. pe.push(&lengthField{})
  55. pe.putInt32(b.PartitionLeaderEpoch)
  56. pe.putInt8(b.Version)
  57. pe.push(newCRC32Field(crcCastagnoli))
  58. pe.putInt16(b.computeAttributes())
  59. pe.putInt32(b.LastOffsetDelta)
  60. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  61. return err
  62. }
  63. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  64. return err
  65. }
  66. pe.putInt64(b.ProducerID)
  67. pe.putInt16(b.ProducerEpoch)
  68. pe.putInt32(b.FirstSequence)
  69. if err := pe.putArrayLength(len(b.Records)); err != nil {
  70. return err
  71. }
  72. if b.compressedRecords == nil {
  73. if err := b.encodeRecords(pe); err != nil {
  74. return err
  75. }
  76. }
  77. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  78. return err
  79. }
  80. if err := pe.pop(); err != nil {
  81. return err
  82. }
  83. return pe.pop()
  84. }
  85. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  86. if b.FirstOffset, err = pd.getInt64(); err != nil {
  87. return err
  88. }
  89. batchLen, err := pd.getInt32()
  90. if err != nil {
  91. return err
  92. }
  93. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  94. return err
  95. }
  96. if b.Version, err = pd.getInt8(); err != nil {
  97. return err
  98. }
  99. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  100. return err
  101. }
  102. attributes, err := pd.getInt16()
  103. if err != nil {
  104. return err
  105. }
  106. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  107. b.Control = attributes&controlMask == controlMask
  108. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  109. return err
  110. }
  111. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  112. return err
  113. }
  114. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  115. return err
  116. }
  117. if b.ProducerID, err = pd.getInt64(); err != nil {
  118. return err
  119. }
  120. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  121. return err
  122. }
  123. if b.FirstSequence, err = pd.getInt32(); err != nil {
  124. return err
  125. }
  126. numRecs, err := pd.getArrayLength()
  127. if err != nil {
  128. return err
  129. }
  130. if numRecs >= 0 {
  131. b.Records = make([]*Record, numRecs)
  132. }
  133. bufSize := int(batchLen) - recordBatchOverhead
  134. recBuffer, err := pd.getRawBytes(bufSize)
  135. if err != nil {
  136. if err == ErrInsufficientData {
  137. b.PartialTrailingRecord = true
  138. b.Records = nil
  139. return nil
  140. }
  141. return err
  142. }
  143. if err = pd.pop(); err != nil {
  144. return err
  145. }
  146. switch b.Codec {
  147. case CompressionNone:
  148. case CompressionGZIP:
  149. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  150. if err != nil {
  151. return err
  152. }
  153. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  154. return err
  155. }
  156. case CompressionSnappy:
  157. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  158. return err
  159. }
  160. case CompressionLZ4:
  161. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  162. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  163. return err
  164. }
  165. case CompressionZSTD:
  166. if recBuffer, err = zstdDecompress(nil, recBuffer); err != nil {
  167. return err
  168. }
  169. default:
  170. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  171. }
  172. b.recordsLen = len(recBuffer)
  173. err = decode(recBuffer, recordsArray(b.Records))
  174. if err == ErrInsufficientData {
  175. b.PartialTrailingRecord = true
  176. b.Records = nil
  177. return nil
  178. }
  179. return err
  180. }
  181. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  182. var raw []byte
  183. var err error
  184. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  185. return err
  186. }
  187. b.recordsLen = len(raw)
  188. switch b.Codec {
  189. case CompressionNone:
  190. b.compressedRecords = raw
  191. case CompressionGZIP:
  192. var buf bytes.Buffer
  193. var writer *gzip.Writer
  194. if b.CompressionLevel != CompressionLevelDefault {
  195. writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
  196. if err != nil {
  197. return err
  198. }
  199. } else {
  200. writer = gzip.NewWriter(&buf)
  201. }
  202. if _, err := writer.Write(raw); err != nil {
  203. return err
  204. }
  205. if err := writer.Close(); err != nil {
  206. return err
  207. }
  208. b.compressedRecords = buf.Bytes()
  209. case CompressionSnappy:
  210. b.compressedRecords = snappy.Encode(raw)
  211. case CompressionLZ4:
  212. var buf bytes.Buffer
  213. writer := lz4.NewWriter(&buf)
  214. if _, err := writer.Write(raw); err != nil {
  215. return err
  216. }
  217. if err := writer.Close(); err != nil {
  218. return err
  219. }
  220. b.compressedRecords = buf.Bytes()
  221. case CompressionZSTD:
  222. c, err := zstdCompressLevel(nil, raw, b.CompressionLevel)
  223. if err != nil {
  224. return err
  225. }
  226. b.compressedRecords = c
  227. default:
  228. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  229. }
  230. return nil
  231. }
  232. func (b *RecordBatch) computeAttributes() int16 {
  233. attr := int16(b.Codec) & int16(compressionCodecMask)
  234. if b.Control {
  235. attr |= controlMask
  236. }
  237. return attr
  238. }
  239. func (b *RecordBatch) addRecord(r *Record) {
  240. b.Records = append(b.Records, r)
  241. }