list_partition_reassignments_request.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package sarama
  2. type ListPartitionReassignmentsRequest struct {
  3. TimeoutMs int32
  4. blocks map[string][]int32
  5. Version int16
  6. }
  7. func (r *ListPartitionReassignmentsRequest) encode(pe packetEncoder) error {
  8. pe.putInt32(r.TimeoutMs)
  9. pe.putCompactArrayLength(len(r.blocks))
  10. for topic, partitions := range r.blocks {
  11. if err := pe.putCompactString(topic); err != nil {
  12. return err
  13. }
  14. if err := pe.putCompactInt32Array(partitions); err != nil {
  15. return err
  16. }
  17. pe.putEmptyTaggedFieldArray()
  18. }
  19. pe.putEmptyTaggedFieldArray()
  20. return nil
  21. }
  22. func (r *ListPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) {
  23. r.Version = version
  24. if r.TimeoutMs, err = pd.getInt32(); err != nil {
  25. return err
  26. }
  27. topicCount, err := pd.getCompactArrayLength()
  28. if err != nil {
  29. return err
  30. }
  31. if topicCount > 0 {
  32. r.blocks = make(map[string][]int32)
  33. for i := 0; i < topicCount; i++ {
  34. topic, err := pd.getCompactString()
  35. if err != nil {
  36. return err
  37. }
  38. partitionCount, err := pd.getCompactArrayLength()
  39. if err != nil {
  40. return err
  41. }
  42. r.blocks[topic] = make([]int32, partitionCount)
  43. for j := 0; j < partitionCount; j++ {
  44. partition, err := pd.getInt32()
  45. if err != nil {
  46. return err
  47. }
  48. r.blocks[topic][j] = partition
  49. }
  50. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  51. return err
  52. }
  53. }
  54. }
  55. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  56. return err
  57. }
  58. return
  59. }
  60. func (r *ListPartitionReassignmentsRequest) key() int16 {
  61. return 46
  62. }
  63. func (r *ListPartitionReassignmentsRequest) version() int16 {
  64. return r.Version
  65. }
  66. func (r *ListPartitionReassignmentsRequest) headerVersion() int16 {
  67. return 2
  68. }
  69. func (r *ListPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
  70. return V2_4_0_0
  71. }
  72. func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32) {
  73. if r.blocks == nil {
  74. r.blocks = make(map[string][]int32)
  75. }
  76. if r.blocks[topic] == nil {
  77. r.blocks[topic] = partitionIDs
  78. }
  79. }