protocol.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "math"
  6. )
  7. func stringLen(in *string) (n int, err error) {
  8. if in == nil {
  9. return 2, nil
  10. }
  11. n = len(*in)
  12. if n > math.MaxInt16 {
  13. return -1, errors.New("kafka: string too long to encode")
  14. }
  15. return 2 + n, nil
  16. }
  17. func encodeString(buf []byte, off int, in *string) (int, error) {
  18. available := len(buf) - off
  19. if available < 2 {
  20. return -1, errors.New("kafka: buffer too short to encode any string")
  21. }
  22. n := -1
  23. if in != nil {
  24. n = len(*in)
  25. }
  26. if n > math.MaxInt16 {
  27. return -1, errors.New("kafka: string too long to encode")
  28. }
  29. if n > available-2 {
  30. return -1, errors.New("kafka: buffer too short to encode string")
  31. }
  32. binary.BigEndian.PutUint16(buf[off:], uint16(n))
  33. off += 2
  34. if n > 0 {
  35. copy(buf[off:], *in)
  36. }
  37. off += n
  38. return off, nil
  39. }
  40. func decodeString(buf []byte) (out *string, err error) {
  41. if len(buf) < 2 {
  42. return nil, errors.New("kafka: buffer too short to contain string")
  43. }
  44. n := int16(binary.BigEndian.Uint16(buf))
  45. switch {
  46. case n < -1:
  47. return nil, errors.New("kafka: invalid negative string length")
  48. case n == -1:
  49. return nil, nil
  50. case n == 0:
  51. emptyString := ""
  52. return &emptyString, nil
  53. case int(n) > len(buf)-2:
  54. return nil, errors.New("kafka: buffer too short to decode string")
  55. default:
  56. result := string(buf[2:])
  57. return &result, nil
  58. }
  59. }
  60. func bytesLen(in *[]byte) (n int, err error) {
  61. if in == nil {
  62. return 4, nil
  63. }
  64. n = len(*in)
  65. if n > math.MaxInt32 {
  66. return -1, errors.New("kafka: bytes too long to encode")
  67. }
  68. return 4 + n, nil
  69. }
  70. func encodeBytes(buf []byte, off int, in *[]byte) (int, error) {
  71. available := len(buf) - off
  72. if available < 4 {
  73. return -1, errors.New("kafka: buffer too short to encode any bytes")
  74. }
  75. n := -1
  76. if in != nil {
  77. n = len(*in)
  78. }
  79. if n > math.MaxInt32 {
  80. return -1, errors.New("kafka: bytes too long to encode")
  81. }
  82. if n > available-4 {
  83. return -1, errors.New("kafka: buffer too short to encode bytes")
  84. }
  85. binary.BigEndian.PutUint32(buf[off:], uint32(n))
  86. off += 4
  87. if n > 0 {
  88. copy(buf[off:], *in)
  89. }
  90. off += n
  91. return off, nil
  92. }
  93. func decodebyte(buf []byte) (out *[]byte, err error) {
  94. if len(buf) < 4 {
  95. return nil, errors.New("kafka: buffer too short to contain bytes")
  96. }
  97. n := int32(binary.BigEndian.Uint32(buf))
  98. switch {
  99. case n < -1:
  100. return nil, errors.New("kafka: invalid negative byte length")
  101. case n == -1:
  102. return nil, nil
  103. case n == 0:
  104. emptyBytes := make([]byte, 0)
  105. return &emptyBytes, nil
  106. case int(n) > len(buf)-4:
  107. return nil, errors.New("kafka: buffer too short to decode bytes")
  108. default:
  109. result := buf[4:]
  110. return &result, nil
  111. }
  112. }