offset_fetch_response.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type OffsetFetchResponseBlock struct {
  5. Offset int64
  6. Metadata string
  7. Err types.KError
  8. }
  9. func (r *OffsetFetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
  10. r.Offset, err = pd.GetInt64()
  11. if err != nil {
  12. return err
  13. }
  14. r.Metadata, err = pd.GetString()
  15. if err != nil {
  16. return err
  17. }
  18. tmp, err := pd.GetInt16()
  19. if err != nil {
  20. return err
  21. }
  22. r.Err = types.KError(tmp)
  23. return nil
  24. }
  25. type OffsetFetchResponse struct {
  26. ClientID string
  27. Blocks map[string]map[int32]*OffsetFetchResponseBlock
  28. }
  29. func (r *OffsetFetchResponse) Decode(pd enc.PacketDecoder) (err error) {
  30. r.ClientID, err = pd.GetString()
  31. if err != nil {
  32. return err
  33. }
  34. numTopics, err := pd.GetArrayLength()
  35. if err != nil {
  36. return err
  37. }
  38. r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
  39. for i := 0; i < numTopics; i++ {
  40. name, err := pd.GetString()
  41. if err != nil {
  42. return err
  43. }
  44. numBlocks, err := pd.GetArrayLength()
  45. if err != nil {
  46. return err
  47. }
  48. r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
  49. for j := 0; j < numBlocks; j++ {
  50. id, err := pd.GetInt32()
  51. if err != nil {
  52. return err
  53. }
  54. block := new(OffsetFetchResponseBlock)
  55. err = block.Decode(pd)
  56. if err != nil {
  57. return err
  58. }
  59. r.Blocks[name][id] = block
  60. }
  61. }
  62. return nil
  63. }