encoder_decoder.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package sarama
  2. import "fmt"
  3. // Encoder is the interface that wraps the basic Encode method.
  4. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
  5. type encoder interface {
  6. encode(pe packetEncoder) error
  7. }
  8. // Encode takes an Encoder and turns it into bytes.
  9. func encode(e encoder) ([]byte, error) {
  10. if e == nil {
  11. return nil, nil
  12. }
  13. var prepEnc prepEncoder
  14. var realEnc realEncoder
  15. err := e.encode(&prepEnc)
  16. if err != nil {
  17. return nil, err
  18. }
  19. if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
  20. return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
  21. }
  22. realEnc.raw = make([]byte, prepEnc.length)
  23. err = e.encode(&realEnc)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return realEnc.raw, nil
  28. }
  29. // Decoder is the interface that wraps the basic Decode method.
  30. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
  31. type decoder interface {
  32. decode(pd packetDecoder) error
  33. }
  34. type versionedDecoder interface {
  35. decode(pd packetDecoder, version int16) error
  36. }
  37. // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
  38. // interpreted using Kafka's encoding rules.
  39. func decode(buf []byte, in decoder) error {
  40. if buf == nil {
  41. return nil
  42. }
  43. helper := realDecoder{raw: buf}
  44. err := in.decode(&helper)
  45. if err != nil {
  46. return err
  47. }
  48. if helper.off != len(buf) {
  49. return PacketDecodingError{"invalid length"}
  50. }
  51. return nil
  52. }
  53. func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
  54. if buf == nil {
  55. return nil
  56. }
  57. helper := realDecoder{raw: buf}
  58. err := in.decode(&helper, version)
  59. if err != nil {
  60. return err
  61. }
  62. if helper.off != len(buf) {
  63. return PacketDecodingError{"invalid length"}
  64. }
  65. return nil
  66. }