offset_fetch_response.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package sarama
  2. type OffsetFetchResponseBlock struct {
  3. Offset int64
  4. Metadata string
  5. Err KError
  6. }
  7. func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
  8. r.Offset, err = pd.getInt64()
  9. if err != nil {
  10. return err
  11. }
  12. r.Metadata, err = pd.getString()
  13. if err != nil {
  14. return err
  15. }
  16. tmp, err := pd.getInt16()
  17. if err != nil {
  18. return err
  19. }
  20. r.Err = KError(tmp)
  21. return nil
  22. }
  23. type OffsetFetchResponse struct {
  24. ClientID string
  25. Blocks map[string]map[int32]*OffsetFetchResponseBlock
  26. }
  27. func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
  28. r.ClientID, err = pd.getString()
  29. if err != nil {
  30. return err
  31. }
  32. numTopics, err := pd.getArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
  37. for i := 0; i < numTopics; i++ {
  38. name, err := pd.getString()
  39. if err != nil {
  40. return err
  41. }
  42. numBlocks, err := pd.getArrayLength()
  43. if err != nil {
  44. return err
  45. }
  46. r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
  47. for j := 0; j < numBlocks; j++ {
  48. id, err := pd.getInt32()
  49. if err != nil {
  50. return err
  51. }
  52. block := new(OffsetFetchResponseBlock)
  53. err = block.decode(pd)
  54. if err != nil {
  55. return err
  56. }
  57. r.Blocks[name][id] = block
  58. }
  59. }
  60. return nil
  61. }