record_batch.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. const recordBatchOverhead = 49
  11. type recordsArray []*Record
  12. func (e recordsArray) encode(pe packetEncoder) error {
  13. for _, r := range e {
  14. if err := r.encode(pe); err != nil {
  15. return err
  16. }
  17. }
  18. return nil
  19. }
  20. func (e recordsArray) decode(pd packetDecoder) error {
  21. for i := range e {
  22. rec := &Record{}
  23. if err := rec.decode(pd); err != nil {
  24. return err
  25. }
  26. e[i] = rec
  27. }
  28. return nil
  29. }
  30. type RecordBatch struct {
  31. FirstOffset int64
  32. PartitionLeaderEpoch int32
  33. Version int8
  34. Codec CompressionCodec
  35. Control bool
  36. LastOffsetDelta int32
  37. FirstTimestamp int64
  38. MaxTimestamp int64
  39. ProducerID int64
  40. ProducerEpoch int16
  41. FirstSequence int32
  42. Records []*Record
  43. PartialTrailingRecord bool
  44. compressedRecords []byte
  45. recordsLen int // uncompressed records size
  46. }
  47. func (b *RecordBatch) encode(pe packetEncoder) error {
  48. if b.Version != 2 {
  49. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  50. }
  51. pe.putInt64(b.FirstOffset)
  52. pe.push(&lengthField{})
  53. pe.putInt32(b.PartitionLeaderEpoch)
  54. pe.putInt8(b.Version)
  55. pe.push(newCRC32Field(crcCastagnoli))
  56. pe.putInt16(b.computeAttributes())
  57. pe.putInt32(b.LastOffsetDelta)
  58. pe.putInt64(b.FirstTimestamp)
  59. pe.putInt64(b.MaxTimestamp)
  60. pe.putInt64(b.ProducerID)
  61. pe.putInt16(b.ProducerEpoch)
  62. pe.putInt32(b.FirstSequence)
  63. if err := pe.putArrayLength(len(b.Records)); err != nil {
  64. return err
  65. }
  66. if b.compressedRecords != nil {
  67. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  68. return err
  69. }
  70. if err := pe.pop(); err != nil {
  71. return err
  72. }
  73. return pe.pop()
  74. }
  75. var raw []byte
  76. if b.Codec != CompressionNone {
  77. var err error
  78. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  79. return err
  80. }
  81. b.recordsLen = len(raw)
  82. }
  83. switch b.Codec {
  84. case CompressionNone:
  85. offset := pe.offset()
  86. if err := recordsArray(b.Records).encode(pe); err != nil {
  87. return err
  88. }
  89. b.recordsLen = pe.offset() - offset
  90. case CompressionGZIP:
  91. var buf bytes.Buffer
  92. writer := gzip.NewWriter(&buf)
  93. if _, err := writer.Write(raw); err != nil {
  94. return err
  95. }
  96. if err := writer.Close(); err != nil {
  97. return err
  98. }
  99. b.compressedRecords = buf.Bytes()
  100. case CompressionSnappy:
  101. b.compressedRecords = snappy.Encode(raw)
  102. case CompressionLZ4:
  103. var buf bytes.Buffer
  104. writer := lz4.NewWriter(&buf)
  105. if _, err := writer.Write(raw); err != nil {
  106. return err
  107. }
  108. if err := writer.Close(); err != nil {
  109. return err
  110. }
  111. b.compressedRecords = buf.Bytes()
  112. default:
  113. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  114. }
  115. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  116. return err
  117. }
  118. if err := pe.pop(); err != nil {
  119. return err
  120. }
  121. return pe.pop()
  122. }
  123. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  124. if b.FirstOffset, err = pd.getInt64(); err != nil {
  125. return err
  126. }
  127. var batchLen int32
  128. if batchLen, err = pd.getInt32(); err != nil {
  129. return err
  130. }
  131. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  132. return err
  133. }
  134. if b.Version, err = pd.getInt8(); err != nil {
  135. return err
  136. }
  137. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  138. return err
  139. }
  140. var attributes int16
  141. if attributes, err = pd.getInt16(); err != nil {
  142. return err
  143. }
  144. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  145. b.Control = attributes&controlMask == controlMask
  146. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  147. return err
  148. }
  149. if b.FirstTimestamp, err = pd.getInt64(); err != nil {
  150. return err
  151. }
  152. if b.MaxTimestamp, err = pd.getInt64(); err != nil {
  153. return err
  154. }
  155. if b.ProducerID, err = pd.getInt64(); err != nil {
  156. return err
  157. }
  158. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  159. return err
  160. }
  161. if b.FirstSequence, err = pd.getInt32(); err != nil {
  162. return err
  163. }
  164. numRecs, err := pd.getArrayLength()
  165. if err != nil {
  166. return err
  167. }
  168. if numRecs >= 0 {
  169. b.Records = make([]*Record, numRecs)
  170. }
  171. bufSize := int(batchLen) - recordBatchOverhead
  172. recBuffer, err := pd.getRawBytes(bufSize)
  173. if err != nil {
  174. return err
  175. }
  176. if err = pd.pop(); err != nil {
  177. return err
  178. }
  179. switch b.Codec {
  180. case CompressionNone:
  181. case CompressionGZIP:
  182. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  183. if err != nil {
  184. return err
  185. }
  186. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  187. return err
  188. }
  189. case CompressionSnappy:
  190. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  191. return err
  192. }
  193. case CompressionLZ4:
  194. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  195. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  196. return err
  197. }
  198. default:
  199. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  200. }
  201. err = decode(recBuffer, recordsArray(b.Records))
  202. if err == ErrInsufficientData {
  203. b.PartialTrailingRecord = true
  204. b.Records = nil
  205. return nil
  206. }
  207. return err
  208. }
  209. func (b *RecordBatch) computeAttributes() int16 {
  210. attr := int16(b.Codec) & int16(compressionCodecMask)
  211. if b.Control {
  212. attr |= controlMask
  213. }
  214. return attr
  215. }
  216. func (b *RecordBatch) addRecord(r *Record) {
  217. b.Records = append(b.Records, r)
  218. }