record_batch.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "time"
  8. "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. const recordBatchOverhead = 49
  12. type recordsArray []*Record
  13. func (e recordsArray) encode(pe packetEncoder) error {
  14. for _, r := range e {
  15. if err := r.encode(pe); err != nil {
  16. return err
  17. }
  18. }
  19. return nil
  20. }
  21. func (e recordsArray) decode(pd packetDecoder) error {
  22. for i := range e {
  23. rec := &Record{}
  24. if err := rec.decode(pd); err != nil {
  25. return err
  26. }
  27. e[i] = rec
  28. }
  29. return nil
  30. }
  31. type RecordBatch struct {
  32. FirstOffset int64
  33. PartitionLeaderEpoch int32
  34. Version int8
  35. Codec CompressionCodec
  36. CompressionLevel int
  37. Control bool
  38. LastOffsetDelta int32
  39. FirstTimestamp time.Time
  40. MaxTimestamp time.Time
  41. ProducerID int64
  42. ProducerEpoch int16
  43. FirstSequence int32
  44. Records []*Record
  45. PartialTrailingRecord bool
  46. compressedRecords []byte
  47. recordsLen int // uncompressed records size
  48. }
  49. func (b *RecordBatch) encode(pe packetEncoder) error {
  50. if b.Version != 2 {
  51. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  52. }
  53. pe.putInt64(b.FirstOffset)
  54. pe.push(&lengthField{})
  55. pe.putInt32(b.PartitionLeaderEpoch)
  56. pe.putInt8(b.Version)
  57. pe.push(newCRC32Field(crcCastagnoli))
  58. pe.putInt16(b.computeAttributes())
  59. pe.putInt32(b.LastOffsetDelta)
  60. if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
  61. return err
  62. }
  63. if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
  64. return err
  65. }
  66. pe.putInt64(b.ProducerID)
  67. pe.putInt16(b.ProducerEpoch)
  68. pe.putInt32(b.FirstSequence)
  69. if err := pe.putArrayLength(len(b.Records)); err != nil {
  70. return err
  71. }
  72. if b.compressedRecords == nil {
  73. if err := b.encodeRecords(pe); err != nil {
  74. return err
  75. }
  76. }
  77. if err := pe.putRawBytes(b.compressedRecords); err != nil {
  78. return err
  79. }
  80. if err := pe.pop(); err != nil {
  81. return err
  82. }
  83. return pe.pop()
  84. }
  85. func (b *RecordBatch) decode(pd packetDecoder) (err error) {
  86. if b.FirstOffset, err = pd.getInt64(); err != nil {
  87. return err
  88. }
  89. batchLen, err := pd.getInt32()
  90. if err != nil {
  91. return err
  92. }
  93. if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
  94. return err
  95. }
  96. if b.Version, err = pd.getInt8(); err != nil {
  97. return err
  98. }
  99. if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
  100. return err
  101. }
  102. attributes, err := pd.getInt16()
  103. if err != nil {
  104. return err
  105. }
  106. b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
  107. b.Control = attributes&controlMask == controlMask
  108. if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
  109. return err
  110. }
  111. if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
  112. return err
  113. }
  114. if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
  115. return err
  116. }
  117. if b.ProducerID, err = pd.getInt64(); err != nil {
  118. return err
  119. }
  120. if b.ProducerEpoch, err = pd.getInt16(); err != nil {
  121. return err
  122. }
  123. if b.FirstSequence, err = pd.getInt32(); err != nil {
  124. return err
  125. }
  126. numRecs, err := pd.getArrayLength()
  127. if err != nil {
  128. return err
  129. }
  130. if numRecs >= 0 {
  131. b.Records = make([]*Record, numRecs)
  132. }
  133. bufSize := int(batchLen) - recordBatchOverhead
  134. recBuffer, err := pd.getRawBytes(bufSize)
  135. if err != nil {
  136. if err == ErrInsufficientData {
  137. b.PartialTrailingRecord = true
  138. b.Records = nil
  139. return nil
  140. }
  141. return err
  142. }
  143. if err = pd.pop(); err != nil {
  144. return err
  145. }
  146. switch b.Codec {
  147. case CompressionNone:
  148. case CompressionGZIP:
  149. reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
  150. if err != nil {
  151. return err
  152. }
  153. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  154. return err
  155. }
  156. case CompressionSnappy:
  157. if recBuffer, err = snappy.Decode(recBuffer); err != nil {
  158. return err
  159. }
  160. case CompressionLZ4:
  161. reader := lz4.NewReader(bytes.NewReader(recBuffer))
  162. if recBuffer, err = ioutil.ReadAll(reader); err != nil {
  163. return err
  164. }
  165. default:
  166. return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
  167. }
  168. b.recordsLen = len(recBuffer)
  169. err = decode(recBuffer, recordsArray(b.Records))
  170. if err == ErrInsufficientData {
  171. b.PartialTrailingRecord = true
  172. b.Records = nil
  173. return nil
  174. }
  175. return err
  176. }
  177. func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
  178. var raw []byte
  179. var err error
  180. if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
  181. return err
  182. }
  183. b.recordsLen = len(raw)
  184. switch b.Codec {
  185. case CompressionNone:
  186. b.compressedRecords = raw
  187. case CompressionGZIP:
  188. var buf bytes.Buffer
  189. var writer *gzip.Writer
  190. if b.CompressionLevel != CompressionLevelDefault {
  191. writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
  192. if err != nil {
  193. return err
  194. }
  195. } else {
  196. writer = gzip.NewWriter(&buf)
  197. }
  198. if _, err := writer.Write(raw); err != nil {
  199. return err
  200. }
  201. if err := writer.Close(); err != nil {
  202. return err
  203. }
  204. b.compressedRecords = buf.Bytes()
  205. case CompressionSnappy:
  206. b.compressedRecords = snappy.Encode(raw)
  207. case CompressionLZ4:
  208. var buf bytes.Buffer
  209. writer := lz4.NewWriter(&buf)
  210. if _, err := writer.Write(raw); err != nil {
  211. return err
  212. }
  213. if err := writer.Close(); err != nil {
  214. return err
  215. }
  216. b.compressedRecords = buf.Bytes()
  217. default:
  218. return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
  219. }
  220. return nil
  221. }
  222. func (b *RecordBatch) computeAttributes() int16 {
  223. attr := int16(b.Codec) & int16(compressionCodecMask)
  224. if b.Control {
  225. attr |= controlMask
  226. }
  227. return attr
  228. }
  229. func (b *RecordBatch) addRecord(r *Record) {
  230. b.Records = append(b.Records, r)
  231. }