join_group_request.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package sarama
  2. type GroupProtocol struct {
  3. Name string
  4. Metadata []byte
  5. }
  6. func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
  7. p.Name, err = pd.getString()
  8. if err != nil {
  9. return err
  10. }
  11. p.Metadata, err = pd.getBytes()
  12. return err
  13. }
  14. func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
  15. if err := pe.putString(p.Name); err != nil {
  16. return err
  17. }
  18. if err := pe.putBytes(p.Metadata); err != nil {
  19. return err
  20. }
  21. return nil
  22. }
  23. type JoinGroupRequest struct {
  24. GroupId string
  25. SessionTimeout int32
  26. MemberId string
  27. ProtocolType string
  28. GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
  29. OrderedGroupProtocols []*GroupProtocol
  30. }
  31. func (r *JoinGroupRequest) encode(pe packetEncoder) error {
  32. if err := pe.putString(r.GroupId); err != nil {
  33. return err
  34. }
  35. pe.putInt32(r.SessionTimeout)
  36. if err := pe.putString(r.MemberId); err != nil {
  37. return err
  38. }
  39. if err := pe.putString(r.ProtocolType); err != nil {
  40. return err
  41. }
  42. if len(r.GroupProtocols) > 0 {
  43. if len(r.OrderedGroupProtocols) > 0 {
  44. return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
  45. }
  46. if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
  47. return err
  48. }
  49. for name, metadata := range r.GroupProtocols {
  50. if err := pe.putString(name); err != nil {
  51. return err
  52. }
  53. if err := pe.putBytes(metadata); err != nil {
  54. return err
  55. }
  56. }
  57. } else {
  58. if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
  59. return err
  60. }
  61. for _, protocol := range r.OrderedGroupProtocols {
  62. if err := protocol.encode(pe); err != nil {
  63. return err
  64. }
  65. }
  66. }
  67. return nil
  68. }
  69. func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
  70. if r.GroupId, err = pd.getString(); err != nil {
  71. return
  72. }
  73. if r.SessionTimeout, err = pd.getInt32(); err != nil {
  74. return
  75. }
  76. if r.MemberId, err = pd.getString(); err != nil {
  77. return
  78. }
  79. if r.ProtocolType, err = pd.getString(); err != nil {
  80. return
  81. }
  82. n, err := pd.getArrayLength()
  83. if err != nil {
  84. return err
  85. }
  86. if n == 0 {
  87. return nil
  88. }
  89. r.GroupProtocols = make(map[string][]byte)
  90. for i := 0; i < n; i++ {
  91. protocol := &GroupProtocol{}
  92. if err := protocol.decode(pd); err != nil {
  93. return err
  94. }
  95. r.GroupProtocols[protocol.Name] = protocol.Metadata
  96. r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
  97. }
  98. return nil
  99. }
  100. func (r *JoinGroupRequest) key() int16 {
  101. return 11
  102. }
  103. func (r *JoinGroupRequest) version() int16 {
  104. return 0
  105. }
  106. func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
  107. return V0_9_0_0
  108. }
  109. func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
  110. r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
  111. Name: name,
  112. Metadata: metadata,
  113. })
  114. }
  115. func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
  116. bin, err := encode(metadata, nil)
  117. if err != nil {
  118. return err
  119. }
  120. r.AddGroupProtocol(name, bin)
  121. return nil
  122. }