offset_response.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package sarama
  2. type OffsetResponseBlock struct {
  3. Err KError
  4. Offsets []int64
  5. }
  6. func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
  7. tmp, err := pd.getInt16()
  8. if err != nil {
  9. return err
  10. }
  11. r.Err = KError(tmp)
  12. r.Offsets, err = pd.getInt64Array()
  13. return err
  14. }
  15. type OffsetResponse struct {
  16. Blocks map[string]map[int32]*OffsetResponseBlock
  17. }
  18. func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
  19. numTopics, err := pd.getArrayLength()
  20. if err != nil {
  21. return err
  22. }
  23. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  24. for i := 0; i < numTopics; i++ {
  25. name, err := pd.getString()
  26. if err != nil {
  27. return err
  28. }
  29. numBlocks, err := pd.getArrayLength()
  30. if err != nil {
  31. return err
  32. }
  33. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  34. for j := 0; j < numBlocks; j++ {
  35. id, err := pd.getInt32()
  36. if err != nil {
  37. return err
  38. }
  39. block := new(OffsetResponseBlock)
  40. err = block.decode(pd)
  41. if err != nil {
  42. return err
  43. }
  44. r.Blocks[name][id] = block
  45. }
  46. }
  47. return nil
  48. }