fetch_response.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package kafka
  2. type fetchResponsePartitionBlock struct {
  3. id int32
  4. err KError
  5. highWaterMarkOffset int64
  6. msgSet messageSet
  7. }
  8. func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
  9. pr.id, err = pd.getInt32()
  10. if err != nil {
  11. return err
  12. }
  13. pr.err, err = pd.getError()
  14. if err != nil {
  15. return err
  16. }
  17. pr.highWaterMarkOffset, err = pd.getInt64()
  18. if err != nil {
  19. return err
  20. }
  21. msgSetSize, err := pd.getInt32()
  22. if err != nil {
  23. return err
  24. }
  25. msgSetDecoder, err := pd.getSubset(int(msgSetSize))
  26. if err != nil {
  27. return err
  28. }
  29. err = (&pr.msgSet).decode(msgSetDecoder)
  30. return err
  31. }
  32. type fetchResponseTopicBlock struct {
  33. name *string
  34. partitions []fetchResponsePartitionBlock
  35. }
  36. func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
  37. pr.name, err = pd.getString()
  38. if err != nil {
  39. return err
  40. }
  41. n, err := pd.getArrayCount()
  42. if err != nil {
  43. return err
  44. }
  45. pr.partitions = make([]fetchResponsePartitionBlock, n)
  46. for i := range pr.partitions {
  47. err = (&pr.partitions[i]).decode(pd)
  48. if err != nil {
  49. return err
  50. }
  51. }
  52. return nil
  53. }
  54. type fetchResponse struct {
  55. topics []fetchResponseTopicBlock
  56. }
  57. func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
  58. n, err := pd.getArrayCount()
  59. if err != nil {
  60. return err
  61. }
  62. pr.topics = make([]fetchResponseTopicBlock, n)
  63. for i := range pr.topics {
  64. err = (&pr.topics[i]).decode(pd)
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. return nil
  70. }
  71. func (pr *fetchResponse) staleTopics() []*string {
  72. ret := make([]*string, 0)
  73. for i := range pr.topics {
  74. topic := &pr.topics[i]
  75. currentTopic:
  76. for j := range topic.partitions {
  77. partition := &topic.partitions[j]
  78. switch partition.err {
  79. case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
  80. ret = append(ret, topic.name)
  81. break currentTopic
  82. }
  83. }
  84. }
  85. return ret
  86. }