offset_response.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type OffsetResponseBlock struct {
  5. Err types.KError
  6. Offsets []int64
  7. }
  8. func (r *OffsetResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
  9. tmp, err := pd.GetInt16()
  10. if err != nil {
  11. return err
  12. }
  13. r.Err = types.KError(tmp)
  14. r.Offsets, err = pd.GetInt64Array()
  15. return err
  16. }
  17. type OffsetResponse struct {
  18. Blocks map[string]map[int32]*OffsetResponseBlock
  19. }
  20. func (r *OffsetResponse) Decode(pd enc.PacketDecoder) (err error) {
  21. numTopics, err := pd.GetArrayLength()
  22. if err != nil {
  23. return err
  24. }
  25. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  26. for i := 0; i < numTopics; i++ {
  27. name, err := pd.GetString()
  28. if err != nil {
  29. return err
  30. }
  31. numBlocks, err := pd.GetArrayLength()
  32. if err != nil {
  33. return err
  34. }
  35. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  36. for j := 0; j < numBlocks; j++ {
  37. id, err := pd.GetInt32()
  38. if err != nil {
  39. return err
  40. }
  41. block := new(OffsetResponseBlock)
  42. err = block.Decode(pd)
  43. if err != nil {
  44. return err
  45. }
  46. r.Blocks[name][id] = block
  47. }
  48. }
  49. return nil
  50. }