record_batch.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. const recordBatchOverhead = 49
  11. type RecordBatch struct {
  12. FirstOffset int64
  13. PartitionLeaderEpoch int32
  14. Version int8
  15. Codec CompressionCodec
  16. Control bool
  17. LastOffsetDelta int32
  18. FirstTimestamp int64
  19. MaxTimestamp int64
  20. ProducerID int64
  21. ProducerEpoch int16
  22. FirstSequence int32
  23. Records []*Record
  24. PartialTrailingRecord bool
  25. compressedRecords []byte
  26. recordsLen int
  27. }
  28. func (b *RecordBatch) encode(pe packetEncoder) error {
  29. if b.Version != 2 {
  30. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  31. }
  32. pe.putInt64(b.FirstOffset)
  33. pe.push(&lengthField{})
  34. pe.putInt32(b.PartitionLeaderEpoch)
  35. pe.putInt8(b.Version)
  36. pe.push(newCRC32Field(crcCastagnoli))
  37. pe.putInt16(b.computeAttributes())
  38. pe.putInt32(b.LastOffsetDelta)
  39. pe.putInt64(b.FirstTimestamp)
  40. pe.putInt64(b.MaxTimestamp)
  41. pe.putInt64(b.ProducerID)
  42. pe.putInt16(b.ProducerEpoch)
  43. pe.putInt32(b.FirstSequence)
  44. if err := pe.putArrayLength(len(b.Records)); err != nil {
  45. return err
  46. }
  47. if b.compressedRecords != nil {
  48. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  49. return err
  50. }
  51. if err := pe.pop(); err != nil {
  52. return err
  53. }
  54. return pe.pop()
  55. }
  56. var re packetEncoder
  57. var raw []byte
  58. switch b.Codec {
  59. case CompressionNone:
  60. re = pe
  61. case CompressionGZIP, CompressionLZ4, CompressionSnappy:
  62. if err := b.computeRecordsLength(); err != nil {
  63. return err
  64. }
  65. raw = make([]byte, b.recordsLen)
  66. re = &realEncoder{raw: raw}
  67. default:
  68. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  69. }
  70. for _, r := range b.Records {
  71. if err := r.encode(re); err != nil {
  72. return err
  73. }
  74. }
  75. switch b.Codec {
  76. case CompressionGZIP:
  77. var buf bytes.Buffer
  78. writer := gzip.NewWriter(&buf)
  79. if _, err := writer.Write(raw); err != nil {
  80. return err
  81. }
  82. if err := writer.Close(); err != nil {
  83. return err
  84. }
  85. b.compressedRecords = buf.Bytes()
  86. case CompressionSnappy:
  87. b.compressedRecords = snappy.Encode(raw)
  88. case CompressionLZ4:
  89. var buf bytes.Buffer
  90. writer := lz4.NewWriter(&buf)
  91. if _, err := writer.Write(raw); err != nil {
  92. return err
  93. }
  94. if err := writer.Close(); err != nil {
  95. return err
  96. }
  97. b.compressedRecords = buf.Bytes()
  98. }
  99. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  100. return err
  101. }
  102. if err := pe.pop(); err != nil {
  103. return err
  104. }
  105. return pe.pop()
  106. }
  107. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  108. if b.FirstOffset, err = pd.getInt64(); err != nil {
  109. return err
  110. }
  111. var batchLen int32
  112. if batchLen, err = pd.getInt32(); err != nil {
  113. return err
  114. }
  115. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  116. return err
  117. }
  118. if b.Version, err = pd.getInt8(); err != nil {
  119. return err
  120. }
  121. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  122. return err
  123. }
  124. var attributes int16
  125. if attributes, err = pd.getInt16(); err != nil {
  126. return err
  127. }
  128. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  129. b.Control = attributes&controlMask == controlMask
  130. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  131. return err
  132. }
  133. if b.FirstTimestamp, err = pd.getInt64(); err != nil {
  134. return err
  135. }
  136. if b.MaxTimestamp, err = pd.getInt64(); err != nil {
  137. return err
  138. }
  139. if b.ProducerID, err = pd.getInt64(); err != nil {
  140. return err
  141. }
  142. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  143. return err
  144. }
  145. if b.FirstSequence, err = pd.getInt32(); err != nil {
  146. return err
  147. }
  148. numRecs, err := pd.getArrayLength()
  149. if err != nil {
  150. return err
  151. }
  152. if numRecs >= 0 {
  153. b.Records = make([]*Record, numRecs)
  154. }
  155. bufSize := int(batchLen) - recordBatchOverhead
  156. recBuffer, err := pd.getRawBytes(bufSize)
  157. if err != nil {
  158. return err
  159. }
  160. if err = pd.pop(); err != nil {
  161. return err
  162. }
  163. switch b.Codec {
  164. case CompressionNone:
  165. case CompressionGZIP:
  166. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  167. if err != nil {
  168. return err
  169. }
  170. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  171. return err
  172. }
  173. case CompressionSnappy:
  174. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  175. return err
  176. }
  177. case CompressionLZ4:
  178. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  179. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  180. return err
  181. }
  182. default:
  183. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  184. }
  185. recPd := &realDecoder{raw: recBuffer}
  186. for i := 0; i < numRecs; i++ {
  187. rec := &Record{}
  188. if err = rec.decode(recPd); err != nil {
  189. if err == ErrInsufficientData {
  190. b.PartialTrailingRecord = true
  191. b.Records = nil
  192. return nil
  193. }
  194. return err
  195. }
  196. b.Records[i] = rec
  197. }
  198. return nil
  199. }
  200. func (b *RecordBatch) computeAttributes() int16 {
  201. attr := int16(b.Codec) & int16(compressionCodecMask)
  202. if b.Control {
  203. attr |= controlMask
  204. }
  205. return attr
  206. }
  207. func (b *RecordBatch) computeRecordsLength() error {
  208. b.recordsLen = 0
  209. for _, r := range b.Records {
  210. l, err := r.getTotalLength()
  211. if err != nil {
  212. return err
  213. }
  214. b.recordsLen += l
  215. }
  216. return nil
  217. }
  218. func (b *RecordBatch) addRecord(r *Record) {
  219. b.Records = append(b.Records, r)
  220. }