record.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "time"
  5. )
  6. const (
  7. controlMask = 0x20
  8. maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
  9. )
  10. //RecordHeader stores key and value for a record header
  11. type RecordHeader struct {
  12. Key []byte
  13. Value []byte
  14. }
  15. func (h *RecordHeader) encode(pe packetEncoder) error {
  16. if err := pe.putVarintBytes(h.Key); err != nil {
  17. return err
  18. }
  19. return pe.putVarintBytes(h.Value)
  20. }
  21. func (h *RecordHeader) decode(pd packetDecoder) (err error) {
  22. if h.Key, err = pd.getVarintBytes(); err != nil {
  23. return err
  24. }
  25. if h.Value, err = pd.getVarintBytes(); err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. //Record is kafka record type
  31. type Record struct {
  32. Headers []*RecordHeader
  33. Attributes int8
  34. TimestampDelta time.Duration
  35. OffsetDelta int64
  36. Key []byte
  37. Value []byte
  38. length varintLengthField
  39. }
  40. func (r *Record) encode(pe packetEncoder) error {
  41. pe.push(&r.length)
  42. pe.putInt8(r.Attributes)
  43. pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
  44. pe.putVarint(r.OffsetDelta)
  45. if err := pe.putVarintBytes(r.Key); err != nil {
  46. return err
  47. }
  48. if err := pe.putVarintBytes(r.Value); err != nil {
  49. return err
  50. }
  51. pe.putVarint(int64(len(r.Headers)))
  52. for _, h := range r.Headers {
  53. if err := h.encode(pe); err != nil {
  54. return err
  55. }
  56. }
  57. return pe.pop()
  58. }
  59. func (r *Record) decode(pd packetDecoder) (err error) {
  60. if err = pd.push(&r.length); err != nil {
  61. return err
  62. }
  63. if r.Attributes, err = pd.getInt8(); err != nil {
  64. return err
  65. }
  66. timestamp, err := pd.getVarint()
  67. if err != nil {
  68. return err
  69. }
  70. r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
  71. if r.OffsetDelta, err = pd.getVarint(); err != nil {
  72. return err
  73. }
  74. if r.Key, err = pd.getVarintBytes(); err != nil {
  75. return err
  76. }
  77. if r.Value, err = pd.getVarintBytes(); err != nil {
  78. return err
  79. }
  80. numHeaders, err := pd.getVarint()
  81. if err != nil {
  82. return err
  83. }
  84. if numHeaders >= 0 {
  85. r.Headers = make([]*RecordHeader, numHeaders)
  86. }
  87. for i := int64(0); i < numHeaders; i++ {
  88. hdr := new(RecordHeader)
  89. if err := hdr.decode(pd); err != nil {
  90. return err
  91. }
  92. r.Headers[i] = hdr
  93. }
  94. return pd.pop()
  95. }