produce_response.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package protocol
  2. type ProduceResponseBlock struct {
  3. Err KError
  4. Offset int64
  5. }
  6. func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
  7. pr.Err, err = pd.getError()
  8. if err != nil {
  9. return err
  10. }
  11. pr.Offset, err = pd.getInt64()
  12. if err != nil {
  13. return err
  14. }
  15. return nil
  16. }
  17. type ProduceResponse struct {
  18. Blocks map[string]map[int32]*ProduceResponseBlock
  19. }
  20. func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
  21. numTopics, err := pd.getArrayCount()
  22. if err != nil {
  23. return err
  24. }
  25. pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
  26. for i := 0; i < numTopics; i++ {
  27. name, err := pd.getString()
  28. if err != nil {
  29. return err
  30. }
  31. numBlocks, err := pd.getArrayCount()
  32. if err != nil {
  33. return err
  34. }
  35. pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
  36. for j := 0; j < numBlocks; j++ {
  37. id, err := pd.getInt32()
  38. if err != nil {
  39. return err
  40. }
  41. block := new(ProduceResponseBlock)
  42. err = block.decode(pd)
  43. if err != nil {
  44. return err
  45. }
  46. pr.Blocks[name][id] = block
  47. }
  48. }
  49. return nil
  50. }
  51. func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
  52. if pr.Blocks == nil {
  53. return nil
  54. }
  55. if pr.Blocks[topic] == nil {
  56. return nil
  57. }
  58. return pr.Blocks[topic][partition]
  59. }