add_partitions_to_txn_response.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type AddPartitionsToTxnResponse struct {
  6. ThrottleTime time.Duration
  7. Errors map[string][]*PartitionError
  8. }
  9. func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
  10. pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
  11. if err := pe.putArrayLength(len(a.Errors)); err != nil {
  12. return err
  13. }
  14. for topic, e := range a.Errors {
  15. if err := pe.putString(topic); err != nil {
  16. return err
  17. }
  18. if err := pe.putArrayLength(len(e)); err != nil {
  19. return err
  20. }
  21. for _, partitionError := range e {
  22. if err := partitionError.encode(pe); err != nil {
  23. return err
  24. }
  25. }
  26. }
  27. return nil
  28. }
  29. func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
  30. throttleTime, err := pd.getInt32()
  31. if err != nil {
  32. return err
  33. }
  34. a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  35. n, err := pd.getArrayLength()
  36. if err != nil {
  37. return err
  38. }
  39. a.Errors = make(map[string][]*PartitionError)
  40. for i := 0; i < n; i++ {
  41. topic, err := pd.getString()
  42. if err != nil {
  43. return err
  44. }
  45. m, err := pd.getArrayLength()
  46. if err != nil {
  47. return err
  48. }
  49. a.Errors[topic] = make([]*PartitionError, m)
  50. for j := 0; j < m; j++ {
  51. a.Errors[topic][j] = new(PartitionError)
  52. if err := a.Errors[topic][j].decode(pd, version); err != nil {
  53. return err
  54. }
  55. }
  56. }
  57. return nil
  58. }
  59. func (a *AddPartitionsToTxnResponse) key() int16 {
  60. return 24
  61. }
  62. func (a *AddPartitionsToTxnResponse) version() int16 {
  63. return 0
  64. }
  65. func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
  66. return V0_11_0_0
  67. }
  68. type PartitionError struct {
  69. Partition int32
  70. Err KError
  71. }
  72. func (p *PartitionError) encode(pe packetEncoder) error {
  73. pe.putInt32(p.Partition)
  74. pe.putInt16(int16(p.Err))
  75. return nil
  76. }
  77. func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
  78. if p.Partition, err = pd.getInt32(); err != nil {
  79. return err
  80. }
  81. kerr, err := pd.getInt16()
  82. if err != nil {
  83. return err
  84. }
  85. p.Err = KError(kerr)
  86. return nil
  87. }