record_batch.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. const recordBatchOverhead = 49
  11. type RecordBatch struct {
  12. FirstOffset int64
  13. PartitionLeaderEpoch int32
  14. Version int8
  15. Codec CompressionCodec
  16. Control bool
  17. LastOffsetDelta int32
  18. FirstTimestamp int64
  19. MaxTimestamp int64
  20. ProducerID int64
  21. ProducerEpoch int16
  22. FirstSequence int32
  23. Records []*Record
  24. PartialTrailingRecord bool
  25. compressedRecords []byte
  26. recordsLen int
  27. }
  28. func (b *RecordBatch) encode(pe packetEncoder) error {
  29. if b.Version != 2 {
  30. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  31. }
  32. pe.putInt64(b.FirstOffset)
  33. pe.push(&lengthField{})
  34. pe.putInt32(b.PartitionLeaderEpoch)
  35. pe.putInt8(b.Version)
  36. pe.push(newCRC32Field(crcCastagnoli))
  37. pe.putInt16(b.computeAttributes())
  38. pe.putInt32(b.LastOffsetDelta)
  39. pe.putInt64(b.FirstTimestamp)
  40. pe.putInt64(b.MaxTimestamp)
  41. pe.putInt64(b.ProducerID)
  42. pe.putInt16(b.ProducerEpoch)
  43. pe.putInt32(b.FirstSequence)
  44. if err := pe.putArrayLength(len(b.Records)); err != nil {
  45. return err
  46. }
  47. if b.compressedRecords != nil {
  48. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  49. return err
  50. }
  51. if err := pe.pop(); err != nil {
  52. return err
  53. }
  54. if err := pe.pop(); err != nil {
  55. return err
  56. }
  57. return nil
  58. }
  59. var re packetEncoder
  60. var raw []byte
  61. switch b.Codec {
  62. case CompressionNone:
  63. re = pe
  64. case CompressionGZIP, CompressionLZ4, CompressionSnappy:
  65. for _, r := range b.Records {
  66. l, err := r.getTotalLength()
  67. if err != nil {
  68. return err
  69. }
  70. b.recordsLen += l
  71. }
  72. raw = make([]byte, b.recordsLen)
  73. re = &realEncoder{raw: raw}
  74. default:
  75. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  76. }
  77. for _, r := range b.Records {
  78. if err := r.encode(re); err != nil {
  79. return err
  80. }
  81. }
  82. switch b.Codec {
  83. case CompressionGZIP:
  84. var buf bytes.Buffer
  85. writer := gzip.NewWriter(&buf)
  86. if _, err := writer.Write(raw); err != nil {
  87. return err
  88. }
  89. if err := writer.Close(); err != nil {
  90. return err
  91. }
  92. b.compressedRecords = buf.Bytes()
  93. case CompressionSnappy:
  94. b.compressedRecords = snappy.Encode(raw)
  95. case CompressionLZ4:
  96. var buf bytes.Buffer
  97. writer := lz4.NewWriter(&buf)
  98. if _, err := writer.Write(raw); err != nil {
  99. return err
  100. }
  101. if err := writer.Close(); err != nil {
  102. return err
  103. }
  104. b.compressedRecords = buf.Bytes()
  105. }
  106. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  107. return err
  108. }
  109. if err := pe.pop(); err != nil {
  110. return err
  111. }
  112. if err := pe.pop(); err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  118. if b.FirstOffset, err = pd.getInt64(); err != nil {
  119. return err
  120. }
  121. var batchLen int32
  122. if batchLen, err = pd.getInt32(); err != nil {
  123. return err
  124. }
  125. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  126. return err
  127. }
  128. if b.Version, err = pd.getInt8(); err != nil {
  129. return err
  130. }
  131. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  132. return err
  133. }
  134. var attributes int16
  135. if attributes, err = pd.getInt16(); err != nil {
  136. return err
  137. }
  138. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  139. b.Control = attributes&controlMask == controlMask
  140. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  141. return err
  142. }
  143. if b.FirstTimestamp, err = pd.getInt64(); err != nil {
  144. return err
  145. }
  146. if b.MaxTimestamp, err = pd.getInt64(); err != nil {
  147. return err
  148. }
  149. if b.ProducerID, err = pd.getInt64(); err != nil {
  150. return err
  151. }
  152. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  153. return err
  154. }
  155. if b.FirstSequence, err = pd.getInt32(); err != nil {
  156. return err
  157. }
  158. numRecs, err := pd.getArrayLength()
  159. if err != nil {
  160. return err
  161. }
  162. if numRecs >= 0 {
  163. b.Records = make([]*Record, numRecs)
  164. }
  165. bufSize := int(batchLen) - recordBatchOverhead
  166. recBuffer, err := pd.getRawBytes(bufSize)
  167. if err != nil {
  168. return err
  169. }
  170. if err = pd.pop(); err != nil {
  171. return err
  172. }
  173. switch b.Codec {
  174. case CompressionNone:
  175. case CompressionGZIP:
  176. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  177. if err != nil {
  178. return err
  179. }
  180. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  181. return err
  182. }
  183. case CompressionSnappy:
  184. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  185. return err
  186. }
  187. case CompressionLZ4:
  188. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  189. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  190. return err
  191. }
  192. default:
  193. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  194. }
  195. recPd := &realDecoder{raw: recBuffer}
  196. for i := 0; i < numRecs; i++ {
  197. rec := &Record{}
  198. if err = rec.decode(recPd); err != nil {
  199. if err == ErrInsufficientData {
  200. b.PartialTrailingRecord = true
  201. b.Records = nil
  202. return nil
  203. }
  204. return err
  205. }
  206. b.Records[i] = rec
  207. }
  208. return nil
  209. }
  210. func (b *RecordBatch) computeAttributes() int16 {
  211. attr := int16(b.Codec) & int16(compressionCodecMask)
  212. if b.Control {
  213. attr |= controlMask
  214. }
  215. return attr
  216. }
  217. func (b *RecordBatch) addRecord(r *Record) {
  218. b.Records = append(b.Records, r)
  219. }