fetch_response.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package sarama
  2. type FetchResponseBlock struct {
  3. Err KError
  4. HighWaterMarkOffset int64
  5. MsgSet MessageSet
  6. }
  7. func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
  8. tmp, err := pd.getInt16()
  9. if err != nil {
  10. return err
  11. }
  12. pr.Err = KError(tmp)
  13. pr.HighWaterMarkOffset, err = pd.getInt64()
  14. if err != nil {
  15. return err
  16. }
  17. msgSetSize, err := pd.getInt32()
  18. if err != nil {
  19. return err
  20. }
  21. msgSetDecoder, err := pd.getSubset(int(msgSetSize))
  22. if err != nil {
  23. return err
  24. }
  25. err = (&pr.MsgSet).decode(msgSetDecoder)
  26. return err
  27. }
  28. type FetchResponse struct {
  29. Blocks map[string]map[int32]*FetchResponseBlock
  30. }
  31. func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
  32. numTopics, err := pd.getArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  37. for i := 0; i < numTopics; i++ {
  38. name, err := pd.getString()
  39. if err != nil {
  40. return err
  41. }
  42. numBlocks, err := pd.getArrayLength()
  43. if err != nil {
  44. return err
  45. }
  46. fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  47. for j := 0; j < numBlocks; j++ {
  48. id, err := pd.getInt32()
  49. if err != nil {
  50. return err
  51. }
  52. block := new(FetchResponseBlock)
  53. err = block.decode(pd)
  54. if err != nil {
  55. return err
  56. }
  57. fr.Blocks[name][id] = block
  58. }
  59. }
  60. return nil
  61. }
  62. func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  63. if fr.Blocks == nil {
  64. return nil
  65. }
  66. if fr.Blocks[topic] == nil {
  67. return nil
  68. }
  69. return fr.Blocks[topic][partition]
  70. }