fetch_response.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package kafka
  2. type FetchResponseBlock struct {
  3. err KError
  4. highWaterMarkOffset int64
  5. msgSet messageSet
  6. }
  7. func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
  8. pr.err, err = pd.getError()
  9. if err != nil {
  10. return err
  11. }
  12. pr.highWaterMarkOffset, err = pd.getInt64()
  13. if err != nil {
  14. return err
  15. }
  16. msgSetSize, err := pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. msgSetDecoder, err := pd.getSubset(int(msgSetSize))
  21. if err != nil {
  22. return err
  23. }
  24. err = (&pr.msgSet).decode(msgSetDecoder)
  25. return err
  26. }
  27. type FetchResponse struct {
  28. Blocks map[*string]map[int32]*FetchResponseBlock
  29. }
  30. func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
  31. numTopics, err := pd.getArrayCount()
  32. if err != nil {
  33. return err
  34. }
  35. fr.Blocks = make(map[*string]map[int32]*FetchResponseBlock, numTopics)
  36. for i := 0; i < numTopics; i++ {
  37. name, err := pd.getString()
  38. if err != nil {
  39. return err
  40. }
  41. numBlocks, err := pd.getArrayCount()
  42. if err != nil {
  43. return err
  44. }
  45. fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  46. for j := 0; j < numBlocks; j++ {
  47. id, err := pd.getInt32()
  48. if err != nil {
  49. return err
  50. }
  51. block := new(FetchResponseBlock)
  52. err = block.decode(pd)
  53. if err != nil {
  54. return err
  55. }
  56. fr.Blocks[name][id] = block
  57. }
  58. }
  59. return nil
  60. }