message.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package protocol
  2. import enc "sarama/encoding"
  3. import (
  4. "bytes"
  5. "compress/gzip"
  6. "io/ioutil"
  7. "sarama/types"
  8. )
  9. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  10. // binary format." but it doesn't say what the current value is, so presumably 0...
  11. const message_format int8 = 0
  12. type Message struct {
  13. Codec types.CompressionCodec // codec used to compress the message contents
  14. Key []byte // the message key, may be nil
  15. Value []byte // the message contents
  16. }
  17. func (m *Message) Encode(pe enc.PacketEncoder) error {
  18. pe.Push(&enc.CRC32Field{})
  19. pe.PutInt8(message_format)
  20. var attributes int8 = 0
  21. attributes |= int8(m.Codec) & 0x07
  22. pe.PutInt8(attributes)
  23. err := pe.PutBytes(m.Key)
  24. if err != nil {
  25. return err
  26. }
  27. var body []byte
  28. switch m.Codec {
  29. case types.COMPRESSION_NONE:
  30. body = m.Value
  31. case types.COMPRESSION_GZIP:
  32. if m.Value != nil {
  33. var buf bytes.Buffer
  34. writer := gzip.NewWriter(&buf)
  35. writer.Write(m.Value)
  36. writer.Close()
  37. body = buf.Bytes()
  38. }
  39. case types.COMPRESSION_SNAPPY:
  40. // TODO
  41. }
  42. err = pe.PutBytes(body)
  43. if err != nil {
  44. return err
  45. }
  46. return pe.Pop()
  47. }
  48. func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
  49. err = pd.Push(&enc.CRC32Field{})
  50. if err != nil {
  51. return err
  52. }
  53. format, err := pd.GetInt8()
  54. if err != nil {
  55. return err
  56. }
  57. if format != message_format {
  58. return enc.DecodingError
  59. }
  60. attribute, err := pd.GetInt8()
  61. if err != nil {
  62. return err
  63. }
  64. m.Codec = types.CompressionCodec(attribute & 0x07)
  65. m.Key, err = pd.GetBytes()
  66. if err != nil {
  67. return err
  68. }
  69. m.Value, err = pd.GetBytes()
  70. if err != nil {
  71. return err
  72. }
  73. switch m.Codec {
  74. case types.COMPRESSION_NONE:
  75. // nothing to do
  76. case types.COMPRESSION_GZIP:
  77. if m.Value == nil {
  78. return enc.DecodingError
  79. }
  80. reader, err := gzip.NewReader(bytes.NewReader(m.Value))
  81. if err != nil {
  82. return err
  83. }
  84. m.Value, err = ioutil.ReadAll(reader)
  85. if err != nil {
  86. return err
  87. }
  88. case types.COMPRESSION_SNAPPY:
  89. // TODO
  90. default:
  91. return enc.DecodingError
  92. }
  93. err = pd.Pop()
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }