fetch_response.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type FetchResponseBlock struct {
  5. Err types.KError
  6. HighWaterMarkOffset int64
  7. MsgSet MessageSet
  8. }
  9. func (pr *FetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
  10. tmp, err := pd.GetInt16()
  11. if err != nil {
  12. return err
  13. }
  14. pr.Err = types.KError(tmp)
  15. pr.HighWaterMarkOffset, err = pd.GetInt64()
  16. if err != nil {
  17. return err
  18. }
  19. msgSetSize, err := pd.GetInt32()
  20. if err != nil {
  21. return err
  22. }
  23. msgSetDecoder, err := pd.GetSubset(int(msgSetSize))
  24. if err != nil {
  25. return err
  26. }
  27. err = (&pr.MsgSet).Decode(msgSetDecoder)
  28. return err
  29. }
  30. type FetchResponse struct {
  31. Blocks map[string]map[int32]*FetchResponseBlock
  32. }
  33. func (fr *FetchResponse) Decode(pd enc.PacketDecoder) (err error) {
  34. numTopics, err := pd.GetArrayLength()
  35. if err != nil {
  36. return err
  37. }
  38. fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  39. for i := 0; i < numTopics; i++ {
  40. name, err := pd.GetString()
  41. if err != nil {
  42. return err
  43. }
  44. numBlocks, err := pd.GetArrayLength()
  45. if err != nil {
  46. return err
  47. }
  48. fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  49. for j := 0; j < numBlocks; j++ {
  50. id, err := pd.GetInt32()
  51. if err != nil {
  52. return err
  53. }
  54. block := new(FetchResponseBlock)
  55. err = block.Decode(pd)
  56. if err != nil {
  57. return err
  58. }
  59. fr.Blocks[name][id] = block
  60. }
  61. }
  62. return nil
  63. }
  64. func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  65. if fr.Blocks == nil {
  66. return nil
  67. }
  68. if fr.Blocks[topic] == nil {
  69. return nil
  70. }
  71. return fr.Blocks[topic][partition]
  72. }