record_batch.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. const recordBatchOverhead = 49
  12. type recordsArray []*Record
  13. func (e recordsArray) encode(pe packetEncoder) error {
  14. for _, r := range e {
  15. if err := r.encode(pe); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }
  21. func (e recordsArray) decode(pd packetDecoder) error {
  22. for i := range e {
  23. rec := &Record{}
  24. if err := rec.decode(pd); err != nil {
  25. return err
  26. }
  27. e[i] = rec
  28. }
  29. return nil
  30. }
  31. type RecordBatch struct {
  32. FirstOffset int64
  33. PartitionLeaderEpoch int32
  34. Version int8
  35. Codec CompressionCodec
  36. Control bool
  37. LastOffsetDelta int32
  38. FirstTimestamp time.Time
  39. MaxTimestamp time.Time
  40. ProducerID int64
  41. ProducerEpoch int16
  42. FirstSequence int32
  43. Records []*Record
  44. PartialTrailingRecord bool
  45. compressedRecords []byte
  46. recordsLen int // uncompressed records size
  47. }
  48. func (b *RecordBatch) encode(pe packetEncoder) error {
  49. if b.Version != 2 {
  50. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  51. }
  52. pe.putInt64(b.FirstOffset)
  53. pe.push(&lengthField{})
  54. pe.putInt32(b.PartitionLeaderEpoch)
  55. pe.putInt8(b.Version)
  56. pe.push(newCRC32Field(crcCastagnoli))
  57. pe.putInt16(b.computeAttributes())
  58. pe.putInt32(b.LastOffsetDelta)
  59. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  60. return err
  61. }
  62. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  63. return err
  64. }
  65. pe.putInt64(b.ProducerID)
  66. pe.putInt16(b.ProducerEpoch)
  67. pe.putInt32(b.FirstSequence)
  68. if err := pe.putArrayLength(len(b.Records)); err != nil {
  69. return err
  70. }
  71. if b.compressedRecords == nil {
  72. if err := b.encodeRecords(pe); err != nil {
  73. return err
  74. }
  75. }
  76. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  77. return err
  78. }
  79. if err := pe.pop(); err != nil {
  80. return err
  81. }
  82. return pe.pop()
  83. }
  84. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  85. if b.FirstOffset, err = pd.getInt64(); err != nil {
  86. return err
  87. }
  88. batchLen, err := pd.getInt32()
  89. if err != nil {
  90. return err
  91. }
  92. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  93. return err
  94. }
  95. if b.Version, err = pd.getInt8(); err != nil {
  96. return err
  97. }
  98. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  99. return err
  100. }
  101. attributes, err := pd.getInt16()
  102. if err != nil {
  103. return err
  104. }
  105. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  106. b.Control = attributes&controlMask == controlMask
  107. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  108. return err
  109. }
  110. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  111. return err
  112. }
  113. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  114. return err
  115. }
  116. if b.ProducerID, err = pd.getInt64(); err != nil {
  117. return err
  118. }
  119. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  120. return err
  121. }
  122. if b.FirstSequence, err = pd.getInt32(); err != nil {
  123. return err
  124. }
  125. numRecs, err := pd.getArrayLength()
  126. if err != nil {
  127. return err
  128. }
  129. if numRecs >= 0 {
  130. b.Records = make([]*Record, numRecs)
  131. }
  132. bufSize := int(batchLen) - recordBatchOverhead
  133. recBuffer, err := pd.getRawBytes(bufSize)
  134. if err != nil {
  135. return err
  136. }
  137. if err = pd.pop(); err != nil {
  138. return err
  139. }
  140. switch b.Codec {
  141. case CompressionNone:
  142. case CompressionGZIP:
  143. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  144. if err != nil {
  145. return err
  146. }
  147. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  148. return err
  149. }
  150. case CompressionSnappy:
  151. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  152. return err
  153. }
  154. case CompressionLZ4:
  155. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  156. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  157. return err
  158. }
  159. default:
  160. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  161. }
  162. err = decode(recBuffer, recordsArray(b.Records))
  163. if err == ErrInsufficientData {
  164. b.PartialTrailingRecord = true
  165. b.Records = nil
  166. return nil
  167. }
  168. return err
  169. }
  170. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  171. var raw []byte
  172. if b.Codec != CompressionNone {
  173. var err error
  174. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  175. return err
  176. }
  177. b.recordsLen = len(raw)
  178. }
  179. switch b.Codec {
  180. case CompressionNone:
  181. offset := pe.offset()
  182. if err := recordsArray(b.Records).encode(pe); err != nil {
  183. return err
  184. }
  185. b.recordsLen = pe.offset() - offset
  186. case CompressionGZIP:
  187. var buf bytes.Buffer
  188. writer := gzip.NewWriter(&buf)
  189. if _, err := writer.Write(raw); err != nil {
  190. return err
  191. }
  192. if err := writer.Close(); err != nil {
  193. return err
  194. }
  195. b.compressedRecords = buf.Bytes()
  196. case CompressionSnappy:
  197. b.compressedRecords = snappy.Encode(raw)
  198. case CompressionLZ4:
  199. var buf bytes.Buffer
  200. writer := lz4.NewWriter(&buf)
  201. if _, err := writer.Write(raw); err != nil {
  202. return err
  203. }
  204. if err := writer.Close(); err != nil {
  205. return err
  206. }
  207. b.compressedRecords = buf.Bytes()
  208. default:
  209. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  210. }
  211. return nil
  212. }
  213. func (b *RecordBatch) computeAttributes() int16 {
  214. attr := int16(b.Codec) & int16(compressionCodecMask)
  215. if b.Control {
  216. attr |= controlMask
  217. }
  218. return attr
  219. }
  220. func (b *RecordBatch) addRecord(r *Record) {
  221. b.Records = append(b.Records, r)
  222. }