message.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package kafka
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "io/ioutil"
  6. )
  7. type compressionCodec int
  8. const (
  9. COMPRESSION_NONE compressionCodec = 0
  10. COMPRESSION_GZIP compressionCodec = 1
  11. COMPRESSION_SNAPPY compressionCodec = 2
  12. )
  13. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  14. // binary format." but it doesn't say what the current value is, so presumably 0...
  15. const message_format int8 = 0
  16. type message struct {
  17. codec compressionCodec
  18. key *[]byte
  19. value *[]byte
  20. }
  21. func (m *message) encode(pe packetEncoder) {
  22. pe.pushCRC32()
  23. pe.putInt8(message_format)
  24. var attributes int8 = 0
  25. attributes |= int8(m.codec & 0x07)
  26. pe.putInt8(attributes)
  27. pe.putBytes(m.key)
  28. var body *[]byte
  29. switch m.codec {
  30. case COMPRESSION_NONE:
  31. body = m.value
  32. case COMPRESSION_GZIP:
  33. if m.value != nil {
  34. var buf bytes.Buffer
  35. writer := gzip.NewWriter(&buf)
  36. writer.Write(*m.value)
  37. writer.Close()
  38. tmp := buf.Bytes()
  39. body = &tmp
  40. }
  41. case COMPRESSION_SNAPPY:
  42. // TODO
  43. }
  44. pe.putBytes(body)
  45. pe.pop()
  46. }
  47. func (m *message) decode(pd packetDecoder) (err error) {
  48. err = pd.pushCRC32()
  49. if err != nil {
  50. return err
  51. }
  52. format, err := pd.getInt8()
  53. if err != nil {
  54. return err
  55. }
  56. if format != message_format {
  57. return DecodingError{}
  58. }
  59. attribute, err := pd.getInt8()
  60. if err != nil {
  61. return err
  62. }
  63. m.codec = compressionCodec(attribute & 0x07)
  64. m.key, err = pd.getBytes()
  65. if err != nil {
  66. return err
  67. }
  68. m.value, err = pd.getBytes()
  69. if err != nil {
  70. return err
  71. }
  72. switch m.codec {
  73. case COMPRESSION_NONE:
  74. // nothing to do
  75. case COMPRESSION_GZIP:
  76. if m.value == nil {
  77. return DecodingError{"Nil contents cannot be compressed."}
  78. }
  79. reader, err := gzip.NewReader(bytes.NewReader(*m.value))
  80. if err != nil {
  81. return err
  82. }
  83. tmp, err := ioutil.ReadAll(reader)
  84. if err != nil {
  85. return err
  86. }
  87. m.value = &tmp
  88. case COMPRESSION_SNAPPY:
  89. // TODO
  90. default:
  91. return DecodingError{"Unknown compression codec."}
  92. }
  93. err = pd.pop()
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. func newMessage(key, value encoder) (msg *message, err error) {
  100. msg = new(message)
  101. msg.key, err = buildBytes(key)
  102. if err != nil {
  103. return nil, err
  104. }
  105. msg.value, err = buildBytes(value)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return msg, nil
  110. }