produce_response.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package kafka
  2. type ProduceResponsePartitionBlock struct {
  3. Id int32
  4. Err KError
  5. Offset int64
  6. }
  7. func (pr *ProduceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
  8. pr.Id, err = pd.getInt32()
  9. if err != nil {
  10. return err
  11. }
  12. pr.Err, err = pd.getError()
  13. if err != nil {
  14. return err
  15. }
  16. pr.Offset, err = pd.getInt64()
  17. if err != nil {
  18. return err
  19. }
  20. return nil
  21. }
  22. type ProduceResponseTopicBlock struct {
  23. Name *string
  24. Partitions []ProduceResponsePartitionBlock
  25. }
  26. func (pr *ProduceResponseTopicBlock) decode(pd packetDecoder) (err error) {
  27. pr.Name, err = pd.getString()
  28. if err != nil {
  29. return err
  30. }
  31. n, err := pd.getArrayCount()
  32. if err != nil {
  33. return err
  34. }
  35. pr.Partitions = make([]ProduceResponsePartitionBlock, n)
  36. for i := range pr.Partitions {
  37. err = (&pr.Partitions[i]).decode(pd)
  38. if err != nil {
  39. return err
  40. }
  41. }
  42. return nil
  43. }
  44. type ProduceResponse struct {
  45. Topics []ProduceResponseTopicBlock
  46. }
  47. func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
  48. n, err := pd.getArrayCount()
  49. if err != nil {
  50. return err
  51. }
  52. pr.Topics = make([]ProduceResponseTopicBlock, n)
  53. for i := range pr.Topics {
  54. err = (&pr.Topics[i]).decode(pd)
  55. if err != nil {
  56. return err
  57. }
  58. }
  59. return nil
  60. }