partitionMetadata.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package kafka
  2. import "errors"
  3. type partitionMetadata struct {
  4. err kafkaError
  5. id int32
  6. leader int32
  7. replicas []int32
  8. isr []int32
  9. }
  10. func (pm *partitionMetadata) length() (int, error) {
  11. length := 6
  12. length += 4
  13. length += len(pm.replicas) * 4
  14. length += 4
  15. length += len(pm.isr) * 4
  16. return length, nil
  17. }
  18. func (pm *partitionMetadata) encode(buf []byte, off int) int {
  19. off = encodeError(buf, off, pm.err)
  20. off = encodeInt32(buf, off, pm.id)
  21. off = encodeInt32(buf, off, pm.leader)
  22. off = encodeInt32(buf, off, int32(len(pm.replicas)))
  23. for _, val := range pm.replicas {
  24. off = encodeInt32(buf, off, val)
  25. }
  26. off = encodeInt32(buf, off, int32(len(pm.isr)))
  27. for _, val := range pm.isr {
  28. off = encodeInt32(buf, off, val)
  29. }
  30. return off
  31. }
  32. func (pm *partitionMetadata) decode(buf []byte, off int) (int, error) {
  33. if len(buf)-off < 14 {
  34. return -1, errors.New("kafka decode: not enough data")
  35. }
  36. pm.err, off = decodeError(buf, off)
  37. pm.id, off = decodeInt32(buf, off)
  38. pm.leader, off = decodeInt32(buf, off)
  39. tmp, off := decodeInt32(buf, off)
  40. length := int(tmp)
  41. if length > (len(buf)-off)/4 {
  42. return -1, errors.New("kafka decode: not enough data")
  43. }
  44. pm.replicas = make([]int32, length)
  45. for i := 0; i < length; i++ {
  46. pm.replicas[i], off = decodeInt32(buf, off)
  47. }
  48. tmp, off = decodeInt32(buf, off)
  49. length = int(tmp)
  50. if length > (len(buf)-off)/4 {
  51. return -1, errors.New("kafka decode: not enough data")
  52. }
  53. pm.isr = make([]int32, length)
  54. for i := 0; i < length; i++ {
  55. pm.isr[i], off = decodeInt32(buf, off)
  56. }
  57. return off, nil
  58. }