offset_response.go 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package protocol
  2. type OffsetResponseBlock struct {
  3. err KError
  4. offsets []int64
  5. }
  6. func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
  7. r.err, err = pd.getError()
  8. if err != nil {
  9. return err
  10. }
  11. r.offsets, err = pd.getInt64Array()
  12. return err
  13. }
  14. type OffsetResponse struct {
  15. Blocks map[string]map[int32]*OffsetResponseBlock
  16. }
  17. func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
  18. numTopics, err := pd.getArrayCount()
  19. if err != nil {
  20. return err
  21. }
  22. r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
  23. for i := 0; i < numTopics; i++ {
  24. name, err := pd.getString()
  25. if err != nil {
  26. return err
  27. }
  28. numBlocks, err := pd.getArrayCount()
  29. if err != nil {
  30. return err
  31. }
  32. r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
  33. for j := 0; j < numBlocks; j++ {
  34. id, err := pd.getInt32()
  35. if err != nil {
  36. return err
  37. }
  38. block := new(OffsetResponseBlock)
  39. err = block.decode(pd)
  40. if err != nil {
  41. return err
  42. }
  43. r.Blocks[name][id] = block
  44. }
  45. }
  46. return nil
  47. }