length_field.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "sync"
  5. )
  6. // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
  7. type lengthField struct {
  8. startOffset int
  9. length int32
  10. }
  11. var lengthFieldPool = sync.Pool{}
  12. func acquireLengthField() *lengthField {
  13. val := lengthFieldPool.Get()
  14. if val != nil {
  15. return val.(*lengthField)
  16. }
  17. return &lengthField{}
  18. }
  19. func releaseLengthField(m *lengthField) {
  20. lengthFieldPool.Put(m)
  21. }
  22. func (l *lengthField) decode(pd packetDecoder) error {
  23. var err error
  24. l.length, err = pd.getInt32()
  25. if err != nil {
  26. return err
  27. }
  28. if l.length > int32(pd.remaining()) {
  29. return ErrInsufficientData
  30. }
  31. return nil
  32. }
  33. func (l *lengthField) saveOffset(in int) {
  34. l.startOffset = in
  35. }
  36. func (l *lengthField) reserveLength() int {
  37. return 4
  38. }
  39. func (l *lengthField) run(curOffset int, buf []byte) error {
  40. binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
  41. return nil
  42. }
  43. func (l *lengthField) check(curOffset int, buf []byte) error {
  44. if int32(curOffset-l.startOffset-4) != l.length {
  45. return PacketDecodingError{"length field invalid"}
  46. }
  47. return nil
  48. }
  49. type varintLengthField struct {
  50. startOffset int
  51. length int64
  52. }
  53. func (l *varintLengthField) decode(pd packetDecoder) error {
  54. var err error
  55. l.length, err = pd.getVarint()
  56. return err
  57. }
  58. func (l *varintLengthField) saveOffset(in int) {
  59. l.startOffset = in
  60. }
  61. func (l *varintLengthField) adjustLength(currOffset int) int {
  62. oldFieldSize := l.reserveLength()
  63. l.length = int64(currOffset - l.startOffset - oldFieldSize)
  64. return l.reserveLength() - oldFieldSize
  65. }
  66. func (l *varintLengthField) reserveLength() int {
  67. var tmp [binary.MaxVarintLen64]byte
  68. return binary.PutVarint(tmp[:], l.length)
  69. }
  70. func (l *varintLengthField) run(curOffset int, buf []byte) error {
  71. binary.PutVarint(buf[l.startOffset:], l.length)
  72. return nil
  73. }
  74. func (l *varintLengthField) check(curOffset int, buf []byte) error {
  75. if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
  76. return PacketDecodingError{"length field invalid"}
  77. }
  78. return nil
  79. }