message.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package kafka
  2. type compressionCodec int
  3. const (
  4. COMPRESSION_NONE compressionCodec = 0
  5. COMPRESSION_GZIP compressionCodec = 1
  6. COMPRESSION_SNAPPY compressionCodec = 2
  7. )
  8. // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
  9. // binary format." but it doesn't say what the current value is, so presumably 0...
  10. const MESSAGE_FORMAT int8 = 0
  11. type message struct {
  12. codec compressionCodec
  13. key *[]byte
  14. value *[]byte
  15. }
  16. func (m *message) encode(pe packetEncoder) {
  17. pe.pushCRC32()
  18. pe.putInt8(MESSAGE_FORMAT)
  19. var attributes int8 = 0
  20. attributes |= int8(m.codec & 0x07)
  21. pe.putInt8(attributes)
  22. pe.putBytes(m.key)
  23. pe.putBytes(m.value)
  24. pe.pop()
  25. }
  26. func (m *message) decode(pd packetDecoder) (err error) {
  27. err = pd.pushCRC32()
  28. if err != nil {
  29. return err
  30. }
  31. format, err := pd.getInt8()
  32. if err != nil {
  33. return err
  34. }
  35. if format != MESSAGE_FORMAT {
  36. return DecodingError{}
  37. }
  38. attribute, err := pd.getInt8()
  39. if err != nil {
  40. return err
  41. }
  42. m.codec = compressionCodec(attribute & 0x07)
  43. m.key, err = pd.getBytes()
  44. if err != nil {
  45. return err
  46. }
  47. m.value, err = pd.getBytes()
  48. if err != nil {
  49. return err
  50. }
  51. err = pd.pop()
  52. if err != nil {
  53. return err
  54. }
  55. return nil
  56. }
  57. func newMessageFromString(in string) *message {
  58. buf := make([]byte, len(in))
  59. return &message{value: &buf}
  60. }