realDecoder.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "math"
  6. )
  7. type realDecoder struct {
  8. raw []byte
  9. off int
  10. }
  11. func (rd *realDecoder) avail() int {
  12. return len(rd.raw) - rd.off
  13. }
  14. func (rd *realDecoder) getInt16() (int16, error) {
  15. if rd.avail() < 2 {
  16. return -1, errors.New("kafka getInt16: not enough data")
  17. }
  18. tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
  19. rd.off += 2
  20. return tmp, nil
  21. }
  22. func (rd *realDecoder) getInt32() (int32, error) {
  23. if rd.avail() < 4 {
  24. return -1, errors.New("kafka getInt32: not enough data")
  25. }
  26. tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
  27. rd.off += 4
  28. return tmp, nil
  29. }
  30. func (rd *realDecoder) getError() (kafkaError, error) {
  31. val, err := rd.getInt16()
  32. return kafkaError(val), err
  33. }
  34. func (rd *realDecoder) getString() (*string, error) {
  35. tmp, err := rd.getInt16()
  36. if err != nil {
  37. return nil, err
  38. }
  39. n := int(tmp)
  40. switch {
  41. case n < -1:
  42. return nil, errors.New("kafka getString: invalid negative length")
  43. case n == -1:
  44. return nil, nil
  45. case n == 0:
  46. return new(string), nil
  47. case n > rd.avail():
  48. return nil, errors.New("kafka getString: not enough data")
  49. default:
  50. tmp := new(string)
  51. *tmp = string(rd.raw[rd.off : rd.off+n])
  52. return tmp, nil
  53. }
  54. }
  55. func (rd *realDecoder) getBytes() (*[]byte, error) {
  56. tmp, err := rd.getInt32()
  57. if err != nil {
  58. return nil, err
  59. }
  60. n := int(tmp)
  61. switch {
  62. case n < -1:
  63. return nil, errors.New("kafka getBytes: invalid negative length")
  64. case n == -1:
  65. return nil, nil
  66. case n == 0:
  67. tmp := make([]byte, 0)
  68. return &tmp, nil
  69. case n > rd.avail():
  70. return nil, errors.New("kafka getString: not enough data")
  71. default:
  72. tmp := rd.raw[rd.off : rd.off+n]
  73. return &tmp, nil
  74. }
  75. }
  76. func (rd *realDecoder) getArrayCount() (int, error) {
  77. if rd.avail() < 4 {
  78. return -1, errors.New("kafka getArrayCount: not enough data")
  79. }
  80. tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
  81. rd.off += 4
  82. if tmp > rd.avail() || tmp > 2*math.MaxUint16 {
  83. return -1, errors.New("kafka getArrayCount: unreasonably long array")
  84. }
  85. return tmp, nil
  86. }