produce_response.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package sarama
  2. type ProduceResponseBlock struct {
  3. Err KError
  4. Offset int64
  5. }
  6. func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
  7. tmp, err := pd.getInt16()
  8. if err != nil {
  9. return err
  10. }
  11. pr.Err = KError(tmp)
  12. pr.Offset, err = pd.getInt64()
  13. if err != nil {
  14. return err
  15. }
  16. return nil
  17. }
  18. type ProduceResponse struct {
  19. Blocks map[string]map[int32]*ProduceResponseBlock
  20. }
  21. func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
  22. numTopics, err := pd.getArrayLength()
  23. if err != nil {
  24. return err
  25. }
  26. pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
  27. for i := 0; i < numTopics; i++ {
  28. name, err := pd.getString()
  29. if err != nil {
  30. return err
  31. }
  32. numBlocks, err := pd.getArrayLength()
  33. if err != nil {
  34. return err
  35. }
  36. pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
  37. for j := 0; j < numBlocks; j++ {
  38. id, err := pd.getInt32()
  39. if err != nil {
  40. return err
  41. }
  42. block := new(ProduceResponseBlock)
  43. err = block.decode(pd)
  44. if err != nil {
  45. return err
  46. }
  47. pr.Blocks[name][id] = block
  48. }
  49. }
  50. return nil
  51. }
  52. func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
  53. if pr.Blocks == nil {
  54. return nil
  55. }
  56. if pr.Blocks[topic] == nil {
  57. return nil
  58. }
  59. return pr.Blocks[topic][partition]
  60. }