record_batch.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. const recordBatchOverhead = 49
  11. type recordsArray []*Record
  12. func (e recordsArray) encode(pe packetEncoder) error {
  13. for _, r := range e {
  14. if err := r.encode(pe); err != nil {
  15. return err
  16. }
  17. }
  18. return nil
  19. }
  20. func (e recordsArray) decode(pd packetDecoder) error {
  21. for i := range e {
  22. rec := &Record{}
  23. if err := rec.decode(pd); err != nil {
  24. return err
  25. }
  26. e[i] = rec
  27. }
  28. return nil
  29. }
  30. type RecordBatch struct {
  31. FirstOffset int64
  32. PartitionLeaderEpoch int32
  33. Version int8
  34. Codec CompressionCodec
  35. Control bool
  36. LastOffsetDelta int32
  37. FirstTimestamp int64
  38. MaxTimestamp int64
  39. ProducerID int64
  40. ProducerEpoch int16
  41. FirstSequence int32
  42. Records []*Record
  43. PartialTrailingRecord bool
  44. compressedRecords []byte
  45. recordsLen int // uncompressed records size
  46. }
  47. func (b *RecordBatch) encode(pe packetEncoder) error {
  48. if b.Version != 2 {
  49. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  50. }
  51. pe.putInt64(b.FirstOffset)
  52. pe.push(&lengthField{})
  53. pe.putInt32(b.PartitionLeaderEpoch)
  54. pe.putInt8(b.Version)
  55. pe.push(newCRC32Field(crcCastagnoli))
  56. pe.putInt16(b.computeAttributes())
  57. pe.putInt32(b.LastOffsetDelta)
  58. pe.putInt64(b.FirstTimestamp)
  59. pe.putInt64(b.MaxTimestamp)
  60. pe.putInt64(b.ProducerID)
  61. pe.putInt16(b.ProducerEpoch)
  62. pe.putInt32(b.FirstSequence)
  63. if err := pe.putArrayLength(len(b.Records)); err != nil {
  64. return err
  65. }
  66. if b.compressedRecords == nil {
  67. if err := b.encodeRecords(pe); err != nil {
  68. return err
  69. }
  70. }
  71. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  72. return err
  73. }
  74. if err := pe.pop(); err != nil {
  75. return err
  76. }
  77. return pe.pop()
  78. }
  79. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  80. if b.FirstOffset, err = pd.getInt64(); err != nil {
  81. return err
  82. }
  83. batchLen, err := pd.getInt32()
  84. if err != nil {
  85. return err
  86. }
  87. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  88. return err
  89. }
  90. if b.Version, err = pd.getInt8(); err != nil {
  91. return err
  92. }
  93. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  94. return err
  95. }
  96. attributes, err := pd.getInt16()
  97. if err != nil {
  98. return err
  99. }
  100. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  101. b.Control = attributes&controlMask == controlMask
  102. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  103. return err
  104. }
  105. if b.FirstTimestamp, err = pd.getInt64(); err != nil {
  106. return err
  107. }
  108. if b.MaxTimestamp, err = pd.getInt64(); err != nil {
  109. return err
  110. }
  111. if b.ProducerID, err = pd.getInt64(); err != nil {
  112. return err
  113. }
  114. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  115. return err
  116. }
  117. if b.FirstSequence, err = pd.getInt32(); err != nil {
  118. return err
  119. }
  120. numRecs, err := pd.getArrayLength()
  121. if err != nil {
  122. return err
  123. }
  124. if numRecs >= 0 {
  125. b.Records = make([]*Record, numRecs)
  126. }
  127. bufSize := int(batchLen) - recordBatchOverhead
  128. recBuffer, err := pd.getRawBytes(bufSize)
  129. if err != nil {
  130. return err
  131. }
  132. if err = pd.pop(); err != nil {
  133. return err
  134. }
  135. switch b.Codec {
  136. case CompressionNone:
  137. case CompressionGZIP:
  138. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  139. if err != nil {
  140. return err
  141. }
  142. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  143. return err
  144. }
  145. case CompressionSnappy:
  146. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  147. return err
  148. }
  149. case CompressionLZ4:
  150. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  151. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  152. return err
  153. }
  154. default:
  155. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  156. }
  157. err = decode(recBuffer, recordsArray(b.Records))
  158. if err == ErrInsufficientData {
  159. b.PartialTrailingRecord = true
  160. b.Records = nil
  161. return nil
  162. }
  163. return err
  164. }
  165. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  166. var raw []byte
  167. if b.Codec != CompressionNone {
  168. var err error
  169. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  170. return err
  171. }
  172. b.recordsLen = len(raw)
  173. }
  174. switch b.Codec {
  175. case CompressionNone:
  176. offset := pe.offset()
  177. if err := recordsArray(b.Records).encode(pe); err != nil {
  178. return err
  179. }
  180. b.recordsLen = pe.offset() - offset
  181. case CompressionGZIP:
  182. var buf bytes.Buffer
  183. writer := gzip.NewWriter(&buf)
  184. if _, err := writer.Write(raw); err != nil {
  185. return err
  186. }
  187. if err := writer.Close(); err != nil {
  188. return err
  189. }
  190. b.compressedRecords = buf.Bytes()
  191. case CompressionSnappy:
  192. b.compressedRecords = snappy.Encode(raw)
  193. case CompressionLZ4:
  194. var buf bytes.Buffer
  195. writer := lz4.NewWriter(&buf)
  196. if _, err := writer.Write(raw); err != nil {
  197. return err
  198. }
  199. if err := writer.Close(); err != nil {
  200. return err
  201. }
  202. b.compressedRecords = buf.Bytes()
  203. default:
  204. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  205. }
  206. return nil
  207. }
  208. func (b *RecordBatch) computeAttributes() int16 {
  209. attr := int16(b.Codec) & int16(compressionCodecMask)
  210. if b.Control {
  211. attr |= controlMask
  212. }
  213. return attr
  214. }
  215. func (b *RecordBatch) addRecord(r *Record) {
  216. b.Records = append(b.Records, r)
  217. }