metadata_response.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package kafka
  2. type PartitionMetadata struct {
  3. Err KError
  4. Id int32
  5. Leader int32
  6. Replicas []int32
  7. Isr []int32
  8. }
  9. func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
  10. pm.Err, err = pd.getError()
  11. if err != nil {
  12. return err
  13. }
  14. pm.Id, err = pd.getInt32()
  15. if err != nil {
  16. return err
  17. }
  18. pm.Leader, err = pd.getInt32()
  19. if err != nil {
  20. return err
  21. }
  22. pm.Replicas, err = pd.getInt32Array()
  23. if err != nil {
  24. return err
  25. }
  26. pm.Isr, err = pd.getInt32Array()
  27. if err != nil {
  28. return err
  29. }
  30. return nil
  31. }
  32. type TopicMetadata struct {
  33. Err KError
  34. Name *string
  35. Partitions []PartitionMetadata
  36. }
  37. func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
  38. tm.Err, err = pd.getError()
  39. if err != nil {
  40. return err
  41. }
  42. tm.Name, err = pd.getString()
  43. if err != nil {
  44. return err
  45. }
  46. n, err := pd.getArrayCount()
  47. if err != nil {
  48. return err
  49. }
  50. tm.Partitions = make([]PartitionMetadata, n)
  51. for i := 0; i < n; i++ {
  52. err = (&tm.Partitions[i]).decode(pd)
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. return nil
  58. }
  59. type MetadataResponse struct {
  60. Brokers []Broker
  61. Topics []TopicMetadata
  62. }
  63. func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
  64. n, err := pd.getArrayCount()
  65. if err != nil {
  66. return err
  67. }
  68. m.Brokers = make([]Broker, n)
  69. for i := 0; i < n; i++ {
  70. err = (&m.Brokers[i]).decode(pd)
  71. if err != nil {
  72. return err
  73. }
  74. }
  75. n, err = pd.getArrayCount()
  76. if err != nil {
  77. return err
  78. }
  79. m.Topics = make([]TopicMetadata, n)
  80. for i := 0; i < n; i++ {
  81. err = (&m.Topics[i]).decode(pd)
  82. if err != nil {
  83. return err
  84. }
  85. }
  86. return nil
  87. }