fetch_response.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package kafka
  2. type fetchResponsePartitionBlock struct {
  3. id int32
  4. err KError
  5. highWaterMarkOffset int64
  6. msgSet messageSet
  7. }
  8. func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
  9. pr.id, err = pd.getInt32()
  10. if err != nil {
  11. return err
  12. }
  13. pr.err, err = pd.getError()
  14. if err != nil {
  15. return err
  16. }
  17. pr.highWaterMarkOffset, err = pd.getInt64()
  18. if err != nil {
  19. return err
  20. }
  21. msgSetSize, err := pd.getInt32()
  22. if err != nil {
  23. return err
  24. }
  25. msgSetDecoder, err := pd.getSubset(int(msgSetSize))
  26. if err != nil {
  27. return err
  28. }
  29. err = (&pr.msgSet).decode(msgSetDecoder)
  30. return err
  31. }
  32. type fetchResponseTopicBlock struct {
  33. name *string
  34. partitions []fetchResponsePartitionBlock
  35. }
  36. func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
  37. pr.name, err = pd.getString()
  38. if err != nil {
  39. return err
  40. }
  41. n, err := pd.getArrayCount()
  42. if err != nil {
  43. return err
  44. }
  45. pr.partitions = make([]fetchResponsePartitionBlock, n)
  46. for i := range pr.partitions {
  47. err = (&pr.partitions[i]).decode(pd)
  48. if err != nil {
  49. return err
  50. }
  51. }
  52. return nil
  53. }
  54. type fetchResponse struct {
  55. topics []fetchResponseTopicBlock
  56. }
  57. func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
  58. n, err := pd.getArrayCount()
  59. if err != nil {
  60. return err
  61. }
  62. pr.topics = make([]fetchResponseTopicBlock, n)
  63. for i := range pr.topics {
  64. err = (&pr.topics[i]).decode(pd)
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. return nil
  70. }