encoder_decoder.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package sarama
  2. import (
  3. "fmt"
  4. "github.com/rcrowley/go-metrics"
  5. )
  6. // Encoder is the interface that wraps the basic Encode method.
  7. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
  8. type encoder interface {
  9. encode(pe packetEncoder) error
  10. }
  11. // Encode takes an Encoder and turns it into bytes while potentially recording metrics.
  12. func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
  13. if e == nil {
  14. return nil, nil
  15. }
  16. var prepEnc prepEncoder
  17. var realEnc realEncoder
  18. err := e.encode(&prepEnc)
  19. if err != nil {
  20. return nil, err
  21. }
  22. if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
  23. return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
  24. }
  25. realEnc.raw = make([]byte, prepEnc.length)
  26. realEnc.registry = metricRegistry
  27. err = e.encode(&realEnc)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return realEnc.raw, nil
  32. }
  33. // Decoder is the interface that wraps the basic Decode method.
  34. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
  35. type decoder interface {
  36. decode(pd packetDecoder) error
  37. }
  38. type versionedDecoder interface {
  39. decode(pd packetDecoder, version int16) error
  40. }
  41. // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
  42. // interpreted using Kafka's encoding rules.
  43. func decode(buf []byte, in decoder) error {
  44. if buf == nil {
  45. return nil
  46. }
  47. helper := realDecoder{raw: buf}
  48. err := in.decode(&helper)
  49. if err != nil {
  50. return err
  51. }
  52. if helper.off != len(buf) {
  53. return PacketDecodingError{"invalid length"}
  54. }
  55. return nil
  56. }
  57. func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
  58. if buf == nil {
  59. return nil
  60. }
  61. helper := realDecoder{raw: buf}
  62. err := in.decode(&helper, version)
  63. if err != nil {
  64. return err
  65. }
  66. if helper.off != len(buf) {
  67. return PacketDecodingError{"invalid length"}
  68. }
  69. return nil
  70. }