length_field.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package sarama
  2. import "encoding/binary"
  3. // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
  4. type lengthField struct {
  5. startOffset int
  6. length int32
  7. }
  8. func (l *lengthField) decode(pd packetDecoder) error {
  9. var err error
  10. l.length, err = pd.getInt32()
  11. if err != nil {
  12. return err
  13. }
  14. if l.length > int32(pd.remaining()) {
  15. return ErrInsufficientData
  16. }
  17. return nil
  18. }
  19. func (l *lengthField) saveOffset(in int) {
  20. l.startOffset = in
  21. }
  22. func (l *lengthField) reserveLength() int {
  23. return 4
  24. }
  25. func (l *lengthField) run(curOffset int, buf []byte) error {
  26. binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
  27. return nil
  28. }
  29. func (l *lengthField) check(curOffset int, buf []byte) error {
  30. if int32(curOffset-l.startOffset-4) != l.length {
  31. return PacketDecodingError{"length field invalid"}
  32. }
  33. return nil
  34. }
  35. type varintLengthField struct {
  36. startOffset int
  37. length int64
  38. }
  39. func (l *varintLengthField) decode(pd packetDecoder) error {
  40. var err error
  41. l.length, err = pd.getVarint()
  42. return err
  43. }
  44. func (l *varintLengthField) saveOffset(in int) {
  45. l.startOffset = in
  46. }
  47. func (l *varintLengthField) adjustLength(currOffset int) int {
  48. oldFieldSize := l.reserveLength()
  49. l.length = int64(currOffset - l.startOffset - oldFieldSize)
  50. return l.reserveLength() - oldFieldSize
  51. }
  52. func (l *varintLengthField) reserveLength() int {
  53. var tmp [binary.MaxVarintLen64]byte
  54. return binary.PutVarint(tmp[:], l.length)
  55. }
  56. func (l *varintLengthField) run(curOffset int, buf []byte) error {
  57. binary.PutVarint(buf[l.startOffset:], l.length)
  58. return nil
  59. }
  60. func (l *varintLengthField) check(curOffset int, buf []byte) error {
  61. if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
  62. return PacketDecodingError{"length field invalid"}
  63. }
  64. return nil
  65. }