fetch_response.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package protocol
  2. import enc "sarama/encoding"
  3. type FetchResponseBlock struct {
  4. Err KError
  5. HighWaterMarkOffset int64
  6. MsgSet MessageSet
  7. }
  8. func (pr *FetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
  9. pr.Err, err = pd.GetError()
  10. if err != nil {
  11. return err
  12. }
  13. pr.HighWaterMarkOffset, err = pd.GetInt64()
  14. if err != nil {
  15. return err
  16. }
  17. msgSetSize, err := pd.GetInt32()
  18. if err != nil {
  19. return err
  20. }
  21. msgSetDecoder, err := pd.GetSubset(int(msgSetSize))
  22. if err != nil {
  23. return err
  24. }
  25. err = (&pr.MsgSet).Decode(msgSetDecoder)
  26. return err
  27. }
  28. type FetchResponse struct {
  29. Blocks map[string]map[int32]*FetchResponseBlock
  30. }
  31. func (fr *FetchResponse) Decode(pd enc.PacketDecoder) (err error) {
  32. numTopics, err := pd.GetArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  37. for i := 0; i < numTopics; i++ {
  38. name, err := pd.GetString()
  39. if err != nil {
  40. return err
  41. }
  42. numBlocks, err := pd.GetArrayLength()
  43. if err != nil {
  44. return err
  45. }
  46. fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  47. for j := 0; j < numBlocks; j++ {
  48. id, err := pd.GetInt32()
  49. if err != nil {
  50. return err
  51. }
  52. block := new(FetchResponseBlock)
  53. err = block.decode(pd)
  54. if err != nil {
  55. return err
  56. }
  57. fr.Blocks[name][id] = block
  58. }
  59. }
  60. return nil
  61. }
  62. func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  63. if fr.Blocks == nil {
  64. return nil
  65. }
  66. if fr.Blocks[topic] == nil {
  67. return nil
  68. }
  69. return fr.Blocks[topic][partition]
  70. }