produce_response.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package kafka
  2. type ProduceResponsePartition struct {
  3. Err KError
  4. Offset int64
  5. }
  6. func (pr *ProduceResponsePartition) decode(pd packetDecoder) (err error) {
  7. pr.Err, err = pd.getError()
  8. if err != nil {
  9. return err
  10. }
  11. pr.Offset, err = pd.getInt64()
  12. if err != nil {
  13. return err
  14. }
  15. return nil
  16. }
  17. type ProduceResponse struct {
  18. Partitions map[*string]map[int32]*ProduceResponsePartition
  19. }
  20. func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
  21. numTopics, err := pd.getArrayCount()
  22. if err != nil {
  23. return err
  24. }
  25. pr.Partitions = make(map[*string]map[int32]*ProduceResponsePartition, numTopics)
  26. for i := 0; i < numTopics; i++ {
  27. name, err := pd.getString()
  28. if err != nil {
  29. return err
  30. }
  31. numPartitions, err := pd.getArrayCount()
  32. if err != nil {
  33. return err
  34. }
  35. pr.Partitions[name] = make(map[int32]*ProduceResponsePartition, numPartitions)
  36. for j := 0; j < numPartitions; j++ {
  37. id, err := pd.getInt32()
  38. if err != nil {
  39. return err
  40. }
  41. partition := new(ProduceResponsePartition)
  42. err = partition.decode(pd)
  43. if err != nil {
  44. return err
  45. }
  46. pr.Partitions[name][id] = partition
  47. }
  48. }
  49. return nil
  50. }