sticky_assignor_user_data.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package sarama
  2. type topicPartitionAssignment struct {
  3. Topic string
  4. Partition int32
  5. }
  6. type StickyAssignorUserData interface {
  7. partitions() []topicPartitionAssignment
  8. hasGeneration() bool
  9. generation() int
  10. }
  11. //StickyAssignorUserDataV0 holds topic partition information for an assignment
  12. type StickyAssignorUserDataV0 struct {
  13. Topics map[string][]int32
  14. topicPartitions []topicPartitionAssignment
  15. }
  16. func (m *StickyAssignorUserDataV0) encode(pe packetEncoder) error {
  17. if err := pe.putArrayLength(len(m.Topics)); err != nil {
  18. return err
  19. }
  20. for topic, partitions := range m.Topics {
  21. if err := pe.putString(topic); err != nil {
  22. return err
  23. }
  24. if err := pe.putInt32Array(partitions); err != nil {
  25. return err
  26. }
  27. }
  28. return nil
  29. }
  30. func (m *StickyAssignorUserDataV0) decode(pd packetDecoder) (err error) {
  31. var topicLen int
  32. if topicLen, err = pd.getArrayLength(); err != nil {
  33. return
  34. }
  35. m.Topics = make(map[string][]int32, topicLen)
  36. for i := 0; i < topicLen; i++ {
  37. var topic string
  38. if topic, err = pd.getString(); err != nil {
  39. return
  40. }
  41. if m.Topics[topic], err = pd.getInt32Array(); err != nil {
  42. return
  43. }
  44. }
  45. m.topicPartitions = populateTopicPartitions(m.Topics)
  46. return nil
  47. }
  48. func (m *StickyAssignorUserDataV0) partitions() []topicPartitionAssignment { return m.topicPartitions }
  49. func (m *StickyAssignorUserDataV0) hasGeneration() bool { return false }
  50. func (m *StickyAssignorUserDataV0) generation() int { return defaultGeneration }
  51. //StickyAssignorUserDataV1 holds topic partition information for an assignment
  52. type StickyAssignorUserDataV1 struct {
  53. Topics map[string][]int32
  54. Generation int32
  55. topicPartitions []topicPartitionAssignment
  56. }
  57. func (m *StickyAssignorUserDataV1) encode(pe packetEncoder) error {
  58. if err := pe.putArrayLength(len(m.Topics)); err != nil {
  59. return err
  60. }
  61. for topic, partitions := range m.Topics {
  62. if err := pe.putString(topic); err != nil {
  63. return err
  64. }
  65. if err := pe.putInt32Array(partitions); err != nil {
  66. return err
  67. }
  68. }
  69. pe.putInt32(m.Generation)
  70. return nil
  71. }
  72. func (m *StickyAssignorUserDataV1) decode(pd packetDecoder) (err error) {
  73. var topicLen int
  74. if topicLen, err = pd.getArrayLength(); err != nil {
  75. return
  76. }
  77. m.Topics = make(map[string][]int32, topicLen)
  78. for i := 0; i < topicLen; i++ {
  79. var topic string
  80. if topic, err = pd.getString(); err != nil {
  81. return
  82. }
  83. if m.Topics[topic], err = pd.getInt32Array(); err != nil {
  84. return
  85. }
  86. }
  87. m.Generation, err = pd.getInt32()
  88. if err != nil {
  89. return err
  90. }
  91. m.topicPartitions = populateTopicPartitions(m.Topics)
  92. return nil
  93. }
  94. func (m *StickyAssignorUserDataV1) partitions() []topicPartitionAssignment { return m.topicPartitions }
  95. func (m *StickyAssignorUserDataV1) hasGeneration() bool { return true }
  96. func (m *StickyAssignorUserDataV1) generation() int { return int(m.Generation) }
  97. func populateTopicPartitions(topics map[string][]int32) []topicPartitionAssignment {
  98. topicPartitions := make([]topicPartitionAssignment, 0)
  99. for topic, partitions := range topics {
  100. for _, partition := range partitions {
  101. topicPartitions = append(topicPartitions, topicPartitionAssignment{Topic: topic, Partition: partition})
  102. }
  103. }
  104. return topicPartitions
  105. }