alter_partition_reassignments_response.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package sarama
  2. type alterPartitionReassignmentsErrorBlock struct {
  3. errorCode KError
  4. errorMessage *string
  5. }
  6. func (b *alterPartitionReassignmentsErrorBlock) encode(pe packetEncoder) error {
  7. pe.putInt16(int16(b.errorCode))
  8. if err := pe.putNullableCompactString(b.errorMessage); err != nil {
  9. return err
  10. }
  11. pe.putEmptyTaggedFieldArray()
  12. return nil
  13. }
  14. func (b *alterPartitionReassignmentsErrorBlock) decode(pd packetDecoder) (err error) {
  15. errorCode, err := pd.getInt16()
  16. if err != nil {
  17. return err
  18. }
  19. b.errorCode = KError(errorCode)
  20. b.errorMessage, err = pd.getCompactNullableString()
  21. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  22. return err
  23. }
  24. return err
  25. }
  26. type AlterPartitionReassignmentsResponse struct {
  27. Version int16
  28. ThrottleTimeMs int32
  29. ErrorCode KError
  30. ErrorMessage *string
  31. Errors map[string]map[int32]*alterPartitionReassignmentsErrorBlock
  32. }
  33. func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string) {
  34. if r.Errors == nil {
  35. r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock)
  36. }
  37. partitions := r.Errors[topic]
  38. if partitions == nil {
  39. partitions = make(map[int32]*alterPartitionReassignmentsErrorBlock)
  40. r.Errors[topic] = partitions
  41. }
  42. partitions[partition] = &alterPartitionReassignmentsErrorBlock{errorCode: kerror, errorMessage: message}
  43. }
  44. func (r *AlterPartitionReassignmentsResponse) encode(pe packetEncoder) error {
  45. pe.putInt32(r.ThrottleTimeMs)
  46. pe.putInt16(int16(r.ErrorCode))
  47. if err := pe.putNullableCompactString(r.ErrorMessage); err != nil {
  48. return err
  49. }
  50. pe.putCompactArrayLength(len(r.Errors))
  51. for topic, partitions := range r.Errors {
  52. if err := pe.putCompactString(topic); err != nil {
  53. return err
  54. }
  55. pe.putCompactArrayLength(len(partitions))
  56. for partition, block := range partitions {
  57. pe.putInt32(partition)
  58. if err := block.encode(pe); err != nil {
  59. return err
  60. }
  61. }
  62. pe.putEmptyTaggedFieldArray()
  63. }
  64. pe.putEmptyTaggedFieldArray()
  65. return nil
  66. }
  67. func (r *AlterPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) {
  68. r.Version = version
  69. if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
  70. return err
  71. }
  72. kerr, err := pd.getInt16()
  73. if err != nil {
  74. return err
  75. }
  76. r.ErrorCode = KError(kerr)
  77. if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil {
  78. return err
  79. }
  80. numTopics, err := pd.getCompactArrayLength()
  81. if err != nil {
  82. return err
  83. }
  84. if numTopics > 0 {
  85. r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics)
  86. for i := 0; i < numTopics; i++ {
  87. topic, err := pd.getCompactString()
  88. if err != nil {
  89. return err
  90. }
  91. ongoingPartitionReassignments, err := pd.getCompactArrayLength()
  92. if err != nil {
  93. return err
  94. }
  95. r.Errors[topic] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments)
  96. for j := 0; j < ongoingPartitionReassignments; j++ {
  97. partition, err := pd.getInt32()
  98. if err != nil {
  99. return err
  100. }
  101. block := &alterPartitionReassignmentsErrorBlock{}
  102. if err := block.decode(pd); err != nil {
  103. return err
  104. }
  105. r.Errors[topic][partition] = block
  106. }
  107. if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
  108. return err
  109. }
  110. }
  111. }
  112. if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. func (r *AlterPartitionReassignmentsResponse) key() int16 {
  118. return 45
  119. }
  120. func (r *AlterPartitionReassignmentsResponse) version() int16 {
  121. return r.Version
  122. }
  123. func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
  124. return 1
  125. }
  126. func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
  127. return V2_4_0_0
  128. }