message.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. )
  8. // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
  9. type CompressionCodec int8
  10. // only the last two bits are really used
  11. const compressionCodecMask int8 = 0x03
  12. const (
  13. CompressionNone CompressionCodec = 0
  14. CompressionGZIP CompressionCodec = 1
  15. CompressionSnappy CompressionCodec = 2
  16. )
  17. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  18. // binary format." but it doesn't say what the current value is, so presumably 0...
  19. const messageFormat int8 = 0
  20. type Message struct {
  21. Codec CompressionCodec // codec used to compress the message contents
  22. Key []byte // the message key, may be nil
  23. Value []byte // the message contents
  24. Set *MessageSet // the message set a message might wrap
  25. compressedCache []byte
  26. }
  27. func (m *Message) encode(pe packetEncoder) error {
  28. pe.push(&crc32Field{})
  29. pe.putInt8(messageFormat)
  30. attributes := int8(m.Codec) & compressionCodecMask
  31. pe.putInt8(attributes)
  32. err := pe.putBytes(m.Key)
  33. if err != nil {
  34. return err
  35. }
  36. var payload []byte
  37. if m.compressedCache != nil {
  38. payload = m.compressedCache
  39. m.compressedCache = nil
  40. } else {
  41. switch m.Codec {
  42. case CompressionNone:
  43. payload = m.Value
  44. case CompressionGZIP:
  45. var buf bytes.Buffer
  46. writer := gzip.NewWriter(&buf)
  47. if _, err = writer.Write(m.Value); err != nil {
  48. return err
  49. }
  50. if err = writer.Close(); err != nil {
  51. return err
  52. }
  53. m.compressedCache = buf.Bytes()
  54. payload = m.compressedCache
  55. case CompressionSnappy:
  56. tmp, err := snappyEncode(m.Value)
  57. if err != nil {
  58. return err
  59. }
  60. m.compressedCache = tmp
  61. payload = m.compressedCache
  62. default:
  63. return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)}
  64. }
  65. }
  66. if err = pe.putBytes(payload); err != nil {
  67. return err
  68. }
  69. return pe.pop()
  70. }
  71. func (m *Message) decode(pd packetDecoder) (err error) {
  72. err = pd.push(&crc32Field{})
  73. if err != nil {
  74. return err
  75. }
  76. format, err := pd.getInt8()
  77. if err != nil {
  78. return err
  79. }
  80. if format != messageFormat {
  81. return PacketDecodingError{"Unexpected messageFormat"}
  82. }
  83. attribute, err := pd.getInt8()
  84. if err != nil {
  85. return err
  86. }
  87. m.Codec = CompressionCodec(attribute & compressionCodecMask)
  88. m.Key, err = pd.getBytes()
  89. if err != nil {
  90. return err
  91. }
  92. m.Value, err = pd.getBytes()
  93. if err != nil {
  94. return err
  95. }
  96. switch m.Codec {
  97. case CompressionNone:
  98. // nothing to do
  99. case CompressionGZIP:
  100. if m.Value == nil {
  101. return PacketDecodingError{"GZIP compression specified, but no data to uncompress"}
  102. }
  103. reader, err := gzip.NewReader(bytes.NewReader(m.Value))
  104. if err != nil {
  105. return err
  106. }
  107. if m.Value, err = ioutil.ReadAll(reader); err != nil {
  108. return err
  109. }
  110. return m.decodeSet()
  111. case CompressionSnappy:
  112. if m.Value == nil {
  113. return PacketDecodingError{"Snappy compression specified, but no data to uncompress"}
  114. }
  115. if m.Value, err = snappyDecode(m.Value); err != nil {
  116. return err
  117. }
  118. return m.decodeSet()
  119. default:
  120. return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)}
  121. }
  122. err = pd.pop()
  123. if err != nil {
  124. return err
  125. }
  126. return nil
  127. }
  128. // decodes a message set from a previousy encoded bulk-message
  129. func (m *Message) decodeSet() (err error) {
  130. pd := realDecoder{raw: m.Value}
  131. m.Set = &MessageSet{}
  132. return m.Set.decode(&pd)
  133. }