produce_response.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. type ProduceResponseBlock struct {
  5. Err types.KError
  6. Offset int64
  7. }
  8. func (pr *ProduceResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
  9. tmp, err := pd.GetInt16()
  10. if err != nil {
  11. return err
  12. }
  13. pr.Err = types.KError(tmp)
  14. pr.Offset, err = pd.GetInt64()
  15. if err != nil {
  16. return err
  17. }
  18. return nil
  19. }
  20. type ProduceResponse struct {
  21. Blocks map[string]map[int32]*ProduceResponseBlock
  22. }
  23. func (pr *ProduceResponse) Decode(pd enc.PacketDecoder) (err error) {
  24. numTopics, err := pd.GetArrayLength()
  25. if err != nil {
  26. return err
  27. }
  28. pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
  29. for i := 0; i < numTopics; i++ {
  30. name, err := pd.GetString()
  31. if err != nil {
  32. return err
  33. }
  34. numBlocks, err := pd.GetArrayLength()
  35. if err != nil {
  36. return err
  37. }
  38. pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
  39. for j := 0; j < numBlocks; j++ {
  40. id, err := pd.GetInt32()
  41. if err != nil {
  42. return err
  43. }
  44. block := new(ProduceResponseBlock)
  45. err = block.Decode(pd)
  46. if err != nil {
  47. return err
  48. }
  49. pr.Blocks[name][id] = block
  50. }
  51. }
  52. return nil
  53. }
  54. func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
  55. if pr.Blocks == nil {
  56. return nil
  57. }
  58. if pr.Blocks[topic] == nil {
  59. return nil
  60. }
  61. return pr.Blocks[topic][partition]
  62. }