encoder_decoder.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package sarama
  2. import "fmt"
  3. // Encoder is the interface that wraps the basic Encode method.
  4. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
  5. type encoder interface {
  6. encode(pe packetEncoder) error
  7. }
  8. // Encode takes an Encoder and turns it into bytes.
  9. func encode(e encoder) ([]byte, error) {
  10. if e == nil {
  11. return nil, nil
  12. }
  13. var prepEnc prepEncoder
  14. var realEnc realEncoder
  15. err := e.encode(&prepEnc)
  16. if err != nil {
  17. return nil, err
  18. }
  19. if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
  20. return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
  21. }
  22. realEnc.raw = make([]byte, prepEnc.length)
  23. err = e.encode(&realEnc)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return realEnc.raw, nil
  28. }
  29. // Decoder is the interface that wraps the basic Decode method.
  30. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
  31. type decoder interface {
  32. decode(pd packetDecoder) error
  33. }
  34. // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
  35. // interpreted using Kafka's encoding rules.
  36. func decode(buf []byte, in decoder) error {
  37. if buf == nil {
  38. return nil
  39. }
  40. helper := realDecoder{raw: buf}
  41. err := in.decode(&helper)
  42. if err != nil {
  43. return err
  44. }
  45. if helper.off != len(buf) {
  46. return PacketDecodingError{"invalid length"}
  47. }
  48. return nil
  49. }