record_batch.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. const recordBatchOverhead = 49
  11. type recordsArray []*Record
  12. func (e recordsArray) encode(pe packetEncoder) error {
  13. for _, r := range e {
  14. if err := r.encode(pe); err != nil {
  15. return err
  16. }
  17. }
  18. return nil
  19. }
  20. func (e recordsArray) decode(pd packetDecoder) error {
  21. for i := range e {
  22. rec := &Record{}
  23. if err := rec.decode(pd); err != nil {
  24. return err
  25. }
  26. e[i] = rec
  27. }
  28. return nil
  29. }
  30. type RecordBatch struct {
  31. FirstOffset int64
  32. PartitionLeaderEpoch int32
  33. Version int8
  34. Codec CompressionCodec
  35. Control bool
  36. LastOffsetDelta int32
  37. FirstTimestamp int64
  38. MaxTimestamp int64
  39. ProducerID int64
  40. ProducerEpoch int16
  41. FirstSequence int32
  42. Records []*Record
  43. PartialTrailingRecord bool
  44. compressedRecords []byte
  45. recordsLen int // uncompressed records size
  46. }
  47. func (b *RecordBatch) encode(pe packetEncoder) error {
  48. if b.Version != 2 {
  49. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  50. }
  51. pe.putInt64(b.FirstOffset)
  52. pe.push(&lengthField{})
  53. pe.putInt32(b.PartitionLeaderEpoch)
  54. pe.putInt8(b.Version)
  55. pe.push(newCRC32Field(crcCastagnoli))
  56. pe.putInt16(b.computeAttributes())
  57. pe.putInt32(b.LastOffsetDelta)
  58. pe.putInt64(b.FirstTimestamp)
  59. pe.putInt64(b.MaxTimestamp)
  60. pe.putInt64(b.ProducerID)
  61. pe.putInt16(b.ProducerEpoch)
  62. pe.putInt32(b.FirstSequence)
  63. if err := pe.putArrayLength(len(b.Records)); err != nil {
  64. return err
  65. }
  66. if b.compressedRecords == nil {
  67. b.encodeRecords(pe)
  68. }
  69. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  70. return err
  71. }
  72. if err := pe.pop(); err != nil {
  73. return err
  74. }
  75. return pe.pop()
  76. }
  77. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  78. if b.FirstOffset, err = pd.getInt64(); err != nil {
  79. return err
  80. }
  81. batchLen, err := pd.getInt32()
  82. if err != nil {
  83. return err
  84. }
  85. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  86. return err
  87. }
  88. if b.Version, err = pd.getInt8(); err != nil {
  89. return err
  90. }
  91. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  92. return err
  93. }
  94. attributes, err := pd.getInt16()
  95. if err != nil {
  96. return err
  97. }
  98. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  99. b.Control = attributes&controlMask == controlMask
  100. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  101. return err
  102. }
  103. if b.FirstTimestamp, err = pd.getInt64(); err != nil {
  104. return err
  105. }
  106. if b.MaxTimestamp, err = pd.getInt64(); err != nil {
  107. return err
  108. }
  109. if b.ProducerID, err = pd.getInt64(); err != nil {
  110. return err
  111. }
  112. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  113. return err
  114. }
  115. if b.FirstSequence, err = pd.getInt32(); err != nil {
  116. return err
  117. }
  118. numRecs, err := pd.getArrayLength()
  119. if err != nil {
  120. return err
  121. }
  122. if numRecs >= 0 {
  123. b.Records = make([]*Record, numRecs)
  124. }
  125. bufSize := int(batchLen) - recordBatchOverhead
  126. recBuffer, err := pd.getRawBytes(bufSize)
  127. if err != nil {
  128. return err
  129. }
  130. if err = pd.pop(); err != nil {
  131. return err
  132. }
  133. switch b.Codec {
  134. case CompressionNone:
  135. case CompressionGZIP:
  136. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  137. if err != nil {
  138. return err
  139. }
  140. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  141. return err
  142. }
  143. case CompressionSnappy:
  144. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  145. return err
  146. }
  147. case CompressionLZ4:
  148. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  149. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  150. return err
  151. }
  152. default:
  153. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  154. }
  155. err = decode(recBuffer, recordsArray(b.Records))
  156. if err == ErrInsufficientData {
  157. b.PartialTrailingRecord = true
  158. b.Records = nil
  159. return nil
  160. }
  161. return err
  162. }
  163. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  164. var raw []byte
  165. if b.Codec != CompressionNone {
  166. var err error
  167. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  168. return err
  169. }
  170. b.recordsLen = len(raw)
  171. }
  172. switch b.Codec {
  173. case CompressionNone:
  174. offset := pe.offset()
  175. if err := recordsArray(b.Records).encode(pe); err != nil {
  176. return err
  177. }
  178. b.recordsLen = pe.offset() - offset
  179. case CompressionGZIP:
  180. var buf bytes.Buffer
  181. writer := gzip.NewWriter(&buf)
  182. if _, err := writer.Write(raw); err != nil {
  183. return err
  184. }
  185. if err := writer.Close(); err != nil {
  186. return err
  187. }
  188. b.compressedRecords = buf.Bytes()
  189. case CompressionSnappy:
  190. b.compressedRecords = snappy.Encode(raw)
  191. case CompressionLZ4:
  192. var buf bytes.Buffer
  193. writer := lz4.NewWriter(&buf)
  194. if _, err := writer.Write(raw); err != nil {
  195. return err
  196. }
  197. if err := writer.Close(); err != nil {
  198. return err
  199. }
  200. b.compressedRecords = buf.Bytes()
  201. default:
  202. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  203. }
  204. return nil
  205. }
  206. func (b *RecordBatch) computeAttributes() int16 {
  207. attr := int16(b.Codec) & int16(compressionCodecMask)
  208. if b.Control {
  209. attr |= controlMask
  210. }
  211. return attr
  212. }
  213. func (b *RecordBatch) addRecord(r *Record) {
  214. b.Records = append(b.Records, r)
  215. }