message.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package protocol
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "io/ioutil"
  6. "sarama/types"
  7. )
  8. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  9. // binary format." but it doesn't say what the current value is, so presumably 0...
  10. const message_format int8 = 0
  11. type Message struct {
  12. Codec types.CompressionCodec // codec used to compress the message contents
  13. Key []byte // the message key, may be nil
  14. Value []byte // the message contents
  15. }
  16. func (m *Message) encode(pe packetEncoder) {
  17. pe.pushCRC32()
  18. pe.putInt8(message_format)
  19. var attributes int8 = 0
  20. attributes |= m.Codec & 0x07
  21. pe.putInt8(attributes)
  22. pe.putBytes(m.Key)
  23. var body []byte
  24. switch m.Codec {
  25. case COMPRESSION_NONE:
  26. body = m.Value
  27. case COMPRESSION_GZIP:
  28. if m.Value != nil {
  29. var buf bytes.Buffer
  30. writer := gzip.NewWriter(&buf)
  31. writer.Write(m.Value)
  32. writer.Close()
  33. body = buf.Bytes()
  34. }
  35. case COMPRESSION_SNAPPY:
  36. // TODO
  37. }
  38. pe.putBytes(body)
  39. pe.pop()
  40. }
  41. func (m *Message) decode(pd packetDecoder) (err error) {
  42. err = pd.pushCRC32()
  43. if err != nil {
  44. return err
  45. }
  46. format, err := pd.getInt8()
  47. if err != nil {
  48. return err
  49. }
  50. if format != message_format {
  51. return DecodingError("Message format mismatch.")
  52. }
  53. attribute, err := pd.getInt8()
  54. if err != nil {
  55. return err
  56. }
  57. m.Codec = attribute & 0x07
  58. m.Key, err = pd.getBytes()
  59. if err != nil {
  60. return err
  61. }
  62. m.Value, err = pd.getBytes()
  63. if err != nil {
  64. return err
  65. }
  66. switch m.Codec {
  67. case COMPRESSION_NONE:
  68. // nothing to do
  69. case COMPRESSION_GZIP:
  70. if m.Value == nil {
  71. return DecodingError("Nil contents cannot be compressed.")
  72. }
  73. reader, err := gzip.NewReader(bytes.NewReader(m.Value))
  74. if err != nil {
  75. return err
  76. }
  77. m.Value, err = ioutil.ReadAll(reader)
  78. if err != nil {
  79. return err
  80. }
  81. case COMPRESSION_SNAPPY:
  82. // TODO
  83. default:
  84. return DecodingError("Unknown compression codec.")
  85. }
  86. err = pd.pop()
  87. if err != nil {
  88. return err
  89. }
  90. return nil
  91. }