consumer_group_members.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package sarama
  2. //ConsumerGroupMemberMetadata holds the metadata for consumer group
  3. type ConsumerGroupMemberMetadata struct {
  4. Version int16
  5. Topics []string
  6. UserData []byte
  7. }
  8. func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
  9. pe.putInt16(m.Version)
  10. if err := pe.putStringArray(m.Topics); err != nil {
  11. return err
  12. }
  13. if err := pe.putBytes(m.UserData); err != nil {
  14. return err
  15. }
  16. return nil
  17. }
  18. func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
  19. if m.Version, err = pd.getInt16(); err != nil {
  20. return
  21. }
  22. if m.Topics, err = pd.getStringArray(); err != nil {
  23. return
  24. }
  25. if m.UserData, err = pd.getBytes(); err != nil {
  26. return
  27. }
  28. return nil
  29. }
  30. //ConsumerGroupMemberAssignment holds the member assignment for a consume group
  31. type ConsumerGroupMemberAssignment struct {
  32. Version int16
  33. Topics map[string][]int32
  34. UserData []byte
  35. }
  36. func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
  37. pe.putInt16(m.Version)
  38. if err := pe.putArrayLength(len(m.Topics)); err != nil {
  39. return err
  40. }
  41. for topic, partitions := range m.Topics {
  42. if err := pe.putString(topic); err != nil {
  43. return err
  44. }
  45. if err := pe.putInt32Array(partitions); err != nil {
  46. return err
  47. }
  48. }
  49. if err := pe.putBytes(m.UserData); err != nil {
  50. return err
  51. }
  52. return nil
  53. }
  54. func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
  55. if m.Version, err = pd.getInt16(); err != nil {
  56. return
  57. }
  58. var topicLen int
  59. if topicLen, err = pd.getArrayLength(); err != nil {
  60. return
  61. }
  62. m.Topics = make(map[string][]int32, topicLen)
  63. for i := 0; i < topicLen; i++ {
  64. var topic string
  65. if topic, err = pd.getString(); err != nil {
  66. return
  67. }
  68. if m.Topics[topic], err = pd.getInt32Array(); err != nil {
  69. return
  70. }
  71. }
  72. if m.UserData, err = pd.getBytes(); err != nil {
  73. return
  74. }
  75. return nil
  76. }