message.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package kafka
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "io/ioutil"
  6. )
  7. type compressionCodec int
  8. const (
  9. COMPRESSION_NONE compressionCodec = 0
  10. COMPRESSION_GZIP compressionCodec = 1
  11. COMPRESSION_SNAPPY compressionCodec = 2
  12. )
  13. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  14. // binary format." but it doesn't say what the current value is, so presumably 0...
  15. const message_format int8 = 0
  16. type Message struct {
  17. Codec compressionCodec // how to compress the contents of the message
  18. Key []byte // the message key, may be nil
  19. Value []byte // the message contents
  20. }
  21. func (m *Message) encode(pe packetEncoder) {
  22. pe.pushCRC32()
  23. pe.putInt8(message_format)
  24. var attributes int8 = 0
  25. attributes |= int8(m.Codec & 0x07)
  26. pe.putInt8(attributes)
  27. pe.putBytes(m.Key)
  28. var body []byte
  29. switch m.Codec {
  30. case COMPRESSION_NONE:
  31. body = m.Value
  32. case COMPRESSION_GZIP:
  33. if m.Value != nil {
  34. var buf bytes.Buffer
  35. writer := gzip.NewWriter(&buf)
  36. writer.Write(m.Value)
  37. writer.Close()
  38. body = buf.Bytes()
  39. }
  40. case COMPRESSION_SNAPPY:
  41. // TODO
  42. }
  43. pe.putBytes(body)
  44. pe.pop()
  45. }
  46. func (m *Message) decode(pd packetDecoder) (err error) {
  47. err = pd.pushCRC32()
  48. if err != nil {
  49. return err
  50. }
  51. format, err := pd.getInt8()
  52. if err != nil {
  53. return err
  54. }
  55. if format != message_format {
  56. return DecodingError("Message format mismatch.")
  57. }
  58. attribute, err := pd.getInt8()
  59. if err != nil {
  60. return err
  61. }
  62. m.Codec = compressionCodec(attribute & 0x07)
  63. m.Key, err = pd.getBytes()
  64. if err != nil {
  65. return err
  66. }
  67. m.Value, err = pd.getBytes()
  68. if err != nil {
  69. return err
  70. }
  71. switch m.Codec {
  72. case COMPRESSION_NONE:
  73. // nothing to do
  74. case COMPRESSION_GZIP:
  75. if m.Value == nil {
  76. return DecodingError("Nil contents cannot be compressed.")
  77. }
  78. reader, err := gzip.NewReader(bytes.NewReader(m.Value))
  79. if err != nil {
  80. return err
  81. }
  82. m.Value, err = ioutil.ReadAll(reader)
  83. if err != nil {
  84. return err
  85. }
  86. case COMPRESSION_SNAPPY:
  87. // TODO
  88. default:
  89. return DecodingError("Unknown compression codec.")
  90. }
  91. err = pd.pop()
  92. if err != nil {
  93. return err
  94. }
  95. return nil
  96. }