offset_response.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package sarama
  2. type OffsetResponseBlock struct {
  3. Err KError
  4. Offsets []int64
  5. }
  6. func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
  7. tmp, err := pd.getInt16()
  8. if err != nil {
  9. return err
  10. }
  11. r.Err = KError(tmp)
  12. r.Offsets, err = pd.getInt64Array()
  13. return err
  14. }
  15. type OffsetResponse struct {
  16. Blocks map[string]map[int32]*OffsetResponseBlock
  17. }
  18. func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
  19. numTopics, err := pd.getArrayLength()
  20. if err != nil {
  21. return err
  22. }
  23. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  24. for i := 0; i < numTopics; i++ {
  25. name, err := pd.getString()
  26. if err != nil {
  27. return err
  28. }
  29. numBlocks, err := pd.getArrayLength()
  30. if err != nil {
  31. return err
  32. }
  33. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  34. for j := 0; j < numBlocks; j++ {
  35. id, err := pd.getInt32()
  36. if err != nil {
  37. return err
  38. }
  39. block := new(OffsetResponseBlock)
  40. err = block.decode(pd)
  41. if err != nil {
  42. return err
  43. }
  44. r.Blocks[name][id] = block
  45. }
  46. }
  47. return nil
  48. }
  49. func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
  50. if r.Blocks == nil {
  51. return nil
  52. }
  53. if r.Blocks[topic] == nil {
  54. return nil
  55. }
  56. return r.Blocks[topic][partition]
  57. }