record_batch.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. const recordBatchOverhead = 49
  12. type recordsArray []*Record
  13. func (e recordsArray) encode(pe packetEncoder) error {
  14. for _, r := range e {
  15. if err := r.encode(pe); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }
  21. func (e recordsArray) decode(pd packetDecoder) error {
  22. for i := range e {
  23. rec := &Record{}
  24. if err := rec.decode(pd); err != nil {
  25. return err
  26. }
  27. e[i] = rec
  28. }
  29. return nil
  30. }
  31. type RecordBatch struct {
  32. FirstOffset int64
  33. PartitionLeaderEpoch int32
  34. Version int8
  35. Codec CompressionCodec
  36. Control bool
  37. LastOffsetDelta int32
  38. FirstTimestamp time.Time
  39. MaxTimestamp time.Time
  40. ProducerID int64
  41. ProducerEpoch int16
  42. FirstSequence int32
  43. Records []*Record
  44. PartialTrailingRecord bool
  45. compressedRecords []byte
  46. recordsLen int // uncompressed records size
  47. }
  48. func (b *RecordBatch) encode(pe packetEncoder) error {
  49. if b.Version != 2 {
  50. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  51. }
  52. pe.putInt64(b.FirstOffset)
  53. pe.push(&lengthField{})
  54. pe.putInt32(b.PartitionLeaderEpoch)
  55. pe.putInt8(b.Version)
  56. pe.push(newCRC32Field(crcCastagnoli))
  57. pe.putInt16(b.computeAttributes())
  58. pe.putInt32(b.LastOffsetDelta)
  59. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  60. return err
  61. }
  62. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  63. return err
  64. }
  65. pe.putInt64(b.ProducerID)
  66. pe.putInt16(b.ProducerEpoch)
  67. pe.putInt32(b.FirstSequence)
  68. if err := pe.putArrayLength(len(b.Records)); err != nil {
  69. return err
  70. }
  71. if b.compressedRecords == nil {
  72. if err := b.encodeRecords(pe); err != nil {
  73. return err
  74. }
  75. }
  76. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  77. return err
  78. }
  79. if err := pe.pop(); err != nil {
  80. return err
  81. }
  82. return pe.pop()
  83. }
  84. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  85. if b.FirstOffset, err = pd.getInt64(); err != nil {
  86. return err
  87. }
  88. batchLen, err := pd.getInt32()
  89. if err != nil {
  90. return err
  91. }
  92. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  93. return err
  94. }
  95. if b.Version, err = pd.getInt8(); err != nil {
  96. return err
  97. }
  98. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  99. return err
  100. }
  101. attributes, err := pd.getInt16()
  102. if err != nil {
  103. return err
  104. }
  105. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  106. b.Control = attributes&controlMask == controlMask
  107. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  108. return err
  109. }
  110. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  111. return err
  112. }
  113. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  114. return err
  115. }
  116. if b.ProducerID, err = pd.getInt64(); err != nil {
  117. return err
  118. }
  119. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  120. return err
  121. }
  122. if b.FirstSequence, err = pd.getInt32(); err != nil {
  123. return err
  124. }
  125. numRecs, err := pd.getArrayLength()
  126. if err != nil {
  127. return err
  128. }
  129. if numRecs >= 0 {
  130. b.Records = make([]*Record, numRecs)
  131. }
  132. bufSize := int(batchLen) - recordBatchOverhead
  133. recBuffer, err := pd.getRawBytes(bufSize)
  134. if err != nil {
  135. return err
  136. }
  137. if err = pd.pop(); err != nil {
  138. return err
  139. }
  140. switch b.Codec {
  141. case CompressionNone:
  142. case CompressionGZIP:
  143. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  144. if err != nil {
  145. return err
  146. }
  147. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  148. return err
  149. }
  150. case CompressionSnappy:
  151. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  152. return err
  153. }
  154. case CompressionLZ4:
  155. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  156. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  157. return err
  158. }
  159. default:
  160. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  161. }
  162. b.recordsLen = len(recBuffer)
  163. err = decode(recBuffer, recordsArray(b.Records))
  164. if err == ErrInsufficientData {
  165. b.PartialTrailingRecord = true
  166. b.Records = nil
  167. return nil
  168. }
  169. return err
  170. }
  171. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  172. var raw []byte
  173. if b.Codec != CompressionNone {
  174. var err error
  175. if raw, err = encode(recordsArray(b.Records), nil); err != nil {
  176. return err
  177. }
  178. b.recordsLen = len(raw)
  179. }
  180. switch b.Codec {
  181. case CompressionNone:
  182. offset := pe.offset()
  183. if err := recordsArray(b.Records).encode(pe); err != nil {
  184. return err
  185. }
  186. b.recordsLen = pe.offset() - offset
  187. case CompressionGZIP:
  188. var buf bytes.Buffer
  189. writer := gzip.NewWriter(&buf)
  190. if _, err := writer.Write(raw); err != nil {
  191. return err
  192. }
  193. if err := writer.Close(); err != nil {
  194. return err
  195. }
  196. b.compressedRecords = buf.Bytes()
  197. case CompressionSnappy:
  198. b.compressedRecords = snappy.Encode(raw)
  199. case CompressionLZ4:
  200. var buf bytes.Buffer
  201. writer := lz4.NewWriter(&buf)
  202. if _, err := writer.Write(raw); err != nil {
  203. return err
  204. }
  205. if err := writer.Close(); err != nil {
  206. return err
  207. }
  208. b.compressedRecords = buf.Bytes()
  209. default:
  210. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  211. }
  212. return nil
  213. }
  214. func (b *RecordBatch) computeAttributes() int16 {
  215. attr := int16(b.Codec) & int16(compressionCodecMask)
  216. if b.Control {
  217. attr |= controlMask
  218. }
  219. return attr
  220. }
  221. func (b *RecordBatch) addRecord(r *Record) {
  222. b.Records = append(b.Records, r)
  223. }