record_batch.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. const recordBatchOverhead = 49
  12. type recordsArray []*Record
  13. func (e recordsArray) encode(pe packetEncoder) error {
  14. for _, r := range e {
  15. if err := r.encode(pe); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }
  21. func (e recordsArray) decode(pd packetDecoder) error {
  22. for i := range e {
  23. rec := &Record{}
  24. if err := rec.decode(pd); err != nil {
  25. return err
  26. }
  27. e[i] = rec
  28. }
  29. return nil
  30. }
  31. type RecordBatch struct {
  32. FirstOffset int64
  33. PartitionLeaderEpoch int32
  34. Version int8
  35. Codec CompressionCodec
  36. Control bool
  37. LastOffsetDelta int32
  38. FirstTimestamp time.Time
  39. MaxTimestamp time.Time
  40. ProducerID int64
  41. ProducerEpoch int16
  42. FirstSequence int32
  43. Records []*Record
  44. PartialTrailingRecord bool
  45. compressedRecords []byte
  46. recordsLen int // uncompressed records size
  47. }
  48. func (b *RecordBatch) encode(pe packetEncoder) error {
  49. if b.Version != 2 {
  50. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  51. }
  52. pe.putInt64(b.FirstOffset)
  53. pe.push(&lengthField{})
  54. pe.putInt32(b.PartitionLeaderEpoch)
  55. pe.putInt8(b.Version)
  56. pe.push(newCRC32Field(crcCastagnoli))
  57. pe.putInt16(b.computeAttributes())
  58. pe.putInt32(b.LastOffsetDelta)
  59. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  60. return err
  61. }
  62. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  63. return err
  64. }
  65. pe.putInt64(b.ProducerID)
  66. pe.putInt16(b.ProducerEpoch)
  67. pe.putInt32(b.FirstSequence)
  68. if err := pe.putArrayLength(len(b.Records)); err != nil {
  69. return err
  70. }
  71. if b.compressedRecords == nil {
  72. if err := b.encodeRecords(pe); err != nil {
  73. return err
  74. }
  75. }
  76. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  77. return err
  78. }
  79. if err := pe.pop(); err != nil {
  80. return err
  81. }
  82. return pe.pop()
  83. }
  84. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  85. if b.FirstOffset, err = pd.getInt64(); err != nil {
  86. return err
  87. }
  88. batchLen, err := pd.getInt32()
  89. if err != nil {
  90. return err
  91. }
  92. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  93. return err
  94. }
  95. if b.Version, err = pd.getInt8(); err != nil {
  96. return err
  97. }
  98. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  99. return err
  100. }
  101. attributes, err := pd.getInt16()
  102. if err != nil {
  103. return err
  104. }
  105. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  106. b.Control = attributes&controlMask == controlMask
  107. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  108. return err
  109. }
  110. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  111. return err
  112. }
  113. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  114. return err
  115. }
  116. if b.ProducerID, err = pd.getInt64(); err != nil {
  117. return err
  118. }
  119. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  120. return err
  121. }
  122. if b.FirstSequence, err = pd.getInt32(); err != nil {
  123. return err
  124. }
  125. numRecs, err := pd.getArrayLength()
  126. if err != nil {
  127. return err
  128. }
  129. if numRecs >= 0 {
  130. b.Records = make([]*Record, numRecs)
  131. }
  132. bufSize := int(batchLen) - recordBatchOverhead
  133. recBuffer, err := pd.getRawBytes(bufSize)
  134. if err != nil {
  135. if err == ErrInsufficientData {
  136. b.PartialTrailingRecord = true
  137. b.Records = nil
  138. return nil
  139. }
  140. return err
  141. }
  142. if err = pd.pop(); err != nil {
  143. return err
  144. }
  145. switch b.Codec {
  146. case CompressionNone:
  147. case CompressionGZIP:
  148. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  149. if err != nil {
  150. return err
  151. }
  152. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  153. return err
  154. }
  155. case CompressionSnappy:
  156. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  157. return err
  158. }
  159. case CompressionLZ4:
  160. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  161. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  162. return err
  163. }
  164. default:
  165. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  166. }
  167. b.recordsLen = len(recBuffer)
  168. err = decode(recBuffer, recordsArray(b.Records))
  169. if err == ErrInsufficientData {
  170. b.PartialTrailingRecord = true
  171. b.Records = nil
  172. return nil
  173. }
  174. return err
  175. }
  176. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  177. var raw []byte
  178. if b.Codec != CompressionNone {
  179. var err error
  180. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  181. return err
  182. }
  183. b.recordsLen = len(raw)
  184. }
  185. switch b.Codec {
  186. case CompressionNone:
  187. offset := pe.offset()
  188. if err := recordsArray(b.Records).encode(pe); err != nil {
  189. return err
  190. }
  191. b.recordsLen = pe.offset() - offset
  192. case CompressionGZIP:
  193. var buf bytes.Buffer
  194. writer := gzip.NewWriter(&buf)
  195. if _, err := writer.Write(raw); err != nil {
  196. return err
  197. }
  198. if err := writer.Close(); err != nil {
  199. return err
  200. }
  201. b.compressedRecords = buf.Bytes()
  202. case CompressionSnappy:
  203. b.compressedRecords = snappy.Encode(raw)
  204. case CompressionLZ4:
  205. var buf bytes.Buffer
  206. writer := lz4.NewWriter(&buf)
  207. if _, err := writer.Write(raw); err != nil {
  208. return err
  209. }
  210. if err := writer.Close(); err != nil {
  211. return err
  212. }
  213. b.compressedRecords = buf.Bytes()
  214. default:
  215. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  216. }
  217. return nil
  218. }
  219. func (b *RecordBatch) computeAttributes() int16 {
  220. attr := int16(b.Codec) & int16(compressionCodecMask)
  221. if b.Control {
  222. attr |= controlMask
  223. }
  224. return attr
  225. }
  226. func (b *RecordBatch) addRecord(r *Record) {
  227. b.Records = append(b.Records, r)
  228. }