message.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package kafka
  2. type compressionCodec int
  3. const (
  4. COMPRESSION_NONE compressionCodec = 0
  5. COMPRESSION_GZIP compressionCodec = 1
  6. COMPRESSION_SNAPPY compressionCodec = 2
  7. )
  8. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  9. // binary format." but it doesn't say what the current value is, so presumably 0...
  10. const MESSAGE_FORMAT int8 = 0
  11. type message struct {
  12. codec compressionCodec
  13. key *[]byte
  14. value *[]byte
  15. }
  16. func (m *message) encode(pe packetEncoder) {
  17. pe.pushCRC32()
  18. pe.putInt8(MESSAGE_FORMAT)
  19. var attributes int8 = 0
  20. attributes |= int8(m.codec & 0x07)
  21. pe.putInt8(attributes)
  22. pe.putBytes(m.key)
  23. pe.putBytes(m.value)
  24. pe.pop()
  25. }
  26. func (m *message) decode(pd packetDecoder) (err error) {
  27. err = pd.pushCRC32()
  28. if err != nil {
  29. return err
  30. }
  31. format, err := pd.getInt8()
  32. if err != nil {
  33. return err
  34. }
  35. if format != MESSAGE_FORMAT {
  36. return DecodingError{}
  37. }
  38. attribute, err := pd.getInt8()
  39. if err != nil {
  40. return err
  41. }
  42. m.codec = compressionCodec(attribute & 0x07)
  43. m.key, err = pd.getBytes()
  44. if err != nil {
  45. return err
  46. }
  47. m.value, err = pd.getBytes()
  48. if err != nil {
  49. return err
  50. }
  51. err = pd.pop()
  52. if err != nil {
  53. return err
  54. }
  55. return nil
  56. }