protocol.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "math"
  6. )
  7. func stringLen(in *string) (n int, err error) {
  8. if in == nil {
  9. return 2, nil
  10. }
  11. n = len(*in)
  12. if n > math.MaxInt16 {
  13. return -1, errors.New("kafka: string too long to encode")
  14. }
  15. return 2 + n, nil
  16. }
  17. func encodeString(in *string, buf []byte) (err error) {
  18. if len(buf) < 2 {
  19. return errors.New("kafka: buffer too short to encode any string")
  20. }
  21. n := -1
  22. if in != nil {
  23. n = len(*in)
  24. }
  25. if n > math.MaxInt16 {
  26. return errors.New("kafka: string too long to encode")
  27. }
  28. if n > len(buf) {
  29. return errors.New("kafka: buffer too short to encode string")
  30. }
  31. binary.BigEndian.PutUint16(buf, uint16(n))
  32. if n > 0 {
  33. copy(buf[2:], *in)
  34. }
  35. return nil
  36. }
  37. func decodeString(buf []byte) (out *string, err error) {
  38. if len(buf) < 2 {
  39. return nil, errors.New("kafka: buffer too short to contain string")
  40. }
  41. n := int16(binary.BigEndian.Uint16(buf))
  42. switch {
  43. case n < -1:
  44. return nil, errors.New("kafka: invalid negative string length")
  45. case n == -1:
  46. return nil, nil
  47. case n == 0:
  48. emptyString := ""
  49. return &emptyString, nil
  50. case int(n) > len(buf)-2:
  51. return nil, errors.New("kafka: buffer too short to decode string")
  52. default:
  53. result := string(buf[2:])
  54. return &result, nil
  55. }
  56. }
  57. func bytesLen(in *[]byte) (n int, err error) {
  58. if in == nil {
  59. return 4, nil
  60. }
  61. n = len(*in)
  62. if n > math.MaxInt32 {
  63. return -1, errors.New("kafka: bytes too long to encode")
  64. }
  65. return 4 + n, nil
  66. }
  67. func encodeBytes(in *[]byte, buf []byte) (err error) {
  68. if len(buf) < 4 {
  69. return errors.New("kafka: buffer too short to encode any bytes")
  70. }
  71. n := -1
  72. if in != nil {
  73. n = len(*in)
  74. }
  75. if n > math.MaxInt32 {
  76. return errors.New("kafka: bytes too long to encode")
  77. }
  78. if n > len(buf) {
  79. return errors.New("kafka: buffer too short to encode bytes")
  80. }
  81. binary.BigEndian.PutUint32(buf, uint32(n))
  82. if n > 0 {
  83. copy(buf[4:], *in)
  84. }
  85. return nil
  86. }
  87. func decodebyte(buf []byte) (out *[]byte, err error) {
  88. if len(buf) < 4 {
  89. return nil, errors.New("kafka: buffer too short to contain bytes")
  90. }
  91. n := int32(binary.BigEndian.Uint32(buf))
  92. switch {
  93. case n < -1:
  94. return nil, errors.New("kafka: invalid negative byte length")
  95. case n == -1:
  96. return nil, nil
  97. case n == 0:
  98. emptyBytes := make([]byte, 0)
  99. return &emptyBytes, nil
  100. case int(n) > len(buf)-4:
  101. return nil, errors.New("kafka: buffer too short to decode bytes")
  102. default:
  103. result := buf[4:]
  104. return &result, nil
  105. }
  106. }