join_group_response.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package sarama
  2. type JoinGroupResponse struct {
  3. Err KError
  4. GenerationId int32
  5. GroupProtocol string
  6. LeaderId string
  7. MemberId string
  8. Members map[string][]byte
  9. }
  10. func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) {
  11. members := make(map[string]ConsumerGroupMemberMetadata, len(r.Members))
  12. for id, bin := range r.Members {
  13. meta := new(ConsumerGroupMemberMetadata)
  14. if err := decode(bin, meta); err != nil {
  15. return nil, err
  16. }
  17. members[id] = *meta
  18. }
  19. return members, nil
  20. }
  21. func (r *JoinGroupResponse) encode(pe packetEncoder) error {
  22. pe.putInt16(int16(r.Err))
  23. pe.putInt32(r.GenerationId)
  24. if err := pe.putString(r.GroupProtocol); err != nil {
  25. return err
  26. }
  27. if err := pe.putString(r.LeaderId); err != nil {
  28. return err
  29. }
  30. if err := pe.putString(r.MemberId); err != nil {
  31. return err
  32. }
  33. if err := pe.putArrayLength(len(r.Members)); err != nil {
  34. return err
  35. }
  36. for memberId, memberMetadata := range r.Members {
  37. if err := pe.putString(memberId); err != nil {
  38. return err
  39. }
  40. if err := pe.putBytes(memberMetadata); err != nil {
  41. return err
  42. }
  43. }
  44. return nil
  45. }
  46. func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
  47. kerr, err := pd.getInt16()
  48. if err != nil {
  49. return err
  50. }
  51. r.Err = KError(kerr)
  52. if r.GenerationId, err = pd.getInt32(); err != nil {
  53. return
  54. }
  55. if r.GroupProtocol, err = pd.getString(); err != nil {
  56. return
  57. }
  58. if r.LeaderId, err = pd.getString(); err != nil {
  59. return
  60. }
  61. if r.MemberId, err = pd.getString(); err != nil {
  62. return
  63. }
  64. n, err := pd.getArrayLength()
  65. if err != nil {
  66. return err
  67. }
  68. if n == 0 {
  69. return nil
  70. }
  71. r.Members = make(map[string][]byte)
  72. for i := 0; i < n; i++ {
  73. memberId, err := pd.getString()
  74. if err != nil {
  75. return err
  76. }
  77. memberMetadata, err := pd.getBytes()
  78. if err != nil {
  79. return err
  80. }
  81. r.Members[memberId] = memberMetadata
  82. }
  83. return nil
  84. }
  85. func (r *JoinGroupResponse) key() int16 {
  86. return 11
  87. }
  88. func (r *JoinGroupResponse) version() int16 {
  89. return 0
  90. }
  91. func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
  92. return V0_9_0_0
  93. }