join_group_request.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package sarama
  2. type JoinGroupRequest struct {
  3. GroupId string
  4. SessionTimeout int32
  5. MemberId string
  6. ProtocolType string
  7. GroupProtocols map[string][]byte
  8. }
  9. func (r *JoinGroupRequest) encode(pe packetEncoder) error {
  10. if err := pe.putString(r.GroupId); err != nil {
  11. return err
  12. }
  13. pe.putInt32(r.SessionTimeout)
  14. if err := pe.putString(r.MemberId); err != nil {
  15. return err
  16. }
  17. if err := pe.putString(r.ProtocolType); err != nil {
  18. return err
  19. }
  20. if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
  21. return err
  22. }
  23. for name, metadata := range r.GroupProtocols {
  24. if err := pe.putString(name); err != nil {
  25. return err
  26. }
  27. if err := pe.putBytes(metadata); err != nil {
  28. return err
  29. }
  30. }
  31. return nil
  32. }
  33. func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
  34. if r.GroupId, err = pd.getString(); err != nil {
  35. return
  36. }
  37. if r.SessionTimeout, err = pd.getInt32(); err != nil {
  38. return
  39. }
  40. if r.MemberId, err = pd.getString(); err != nil {
  41. return
  42. }
  43. if r.ProtocolType, err = pd.getString(); err != nil {
  44. return
  45. }
  46. n, err := pd.getArrayLength()
  47. if err != nil {
  48. return err
  49. }
  50. if n == 0 {
  51. return nil
  52. }
  53. r.GroupProtocols = make(map[string][]byte)
  54. for i := 0; i < n; i++ {
  55. name, err := pd.getString()
  56. if err != nil {
  57. return err
  58. }
  59. metadata, err := pd.getBytes()
  60. if err != nil {
  61. return err
  62. }
  63. r.GroupProtocols[name] = metadata
  64. }
  65. return nil
  66. }
  67. func (r *JoinGroupRequest) key() int16 {
  68. return 11
  69. }
  70. func (r *JoinGroupRequest) version() int16 {
  71. return 0
  72. }
  73. func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
  74. if r.GroupProtocols == nil {
  75. r.GroupProtocols = make(map[string][]byte)
  76. }
  77. r.GroupProtocols[name] = metadata
  78. }
  79. func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
  80. bin, err := encode(metadata)
  81. if err != nil {
  82. return err
  83. }
  84. r.AddGroupProtocol(name, bin)
  85. return nil
  86. }