fetch_response.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package kafka
  2. type fetchResponsePartitionBlock struct {
  3. id int32
  4. err KError
  5. highWaterMarkOffset int64
  6. msgSet messageSet
  7. }
  8. func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
  9. pr.id, err = pd.getInt32()
  10. if err != nil {
  11. return err
  12. }
  13. pr.err, err = pd.getError()
  14. if err != nil {
  15. return err
  16. }
  17. pr.highWaterMarkOffset, err = pd.getInt64()
  18. if err != nil {
  19. return err
  20. }
  21. return nil
  22. }
  23. type fetchResponseTopicBlock struct {
  24. name *string
  25. partitions []fetchResponsePartitionBlock
  26. }
  27. func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
  28. pr.name, err = pd.getString()
  29. if err != nil {
  30. return err
  31. }
  32. n, err := pd.getArrayCount()
  33. if err != nil {
  34. return err
  35. }
  36. pr.partitions = make([]fetchResponsePartitionBlock, n)
  37. for i := range pr.partitions {
  38. err = (&pr.partitions[i]).decode(pd)
  39. if err != nil {
  40. return err
  41. }
  42. }
  43. return nil
  44. }
  45. type fetchResponse struct {
  46. topics []fetchResponseTopicBlock
  47. }
  48. func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
  49. n, err := pd.getArrayCount()
  50. if err != nil {
  51. return err
  52. }
  53. pr.topics = make([]fetchResponseTopicBlock, n)
  54. for i := range pr.topics {
  55. err = (&pr.topics[i]).decode(pd)
  56. if err != nil {
  57. return err
  58. }
  59. }
  60. return nil
  61. }
  62. func (pr *fetchResponse) staleTopics() []*string {
  63. ret := make([]*string, 0)
  64. for i := range pr.topics {
  65. topic := &pr.topics[i]
  66. currentTopic:
  67. for j := range topic.partitions {
  68. partition := &topic.partitions[j]
  69. switch partition.err {
  70. case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
  71. ret = append(ret, topic.name)
  72. break currentTopic
  73. }
  74. }
  75. }
  76. return ret
  77. }