join_group_request.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package sarama
  2. type GroupProtocol struct {
  3. Name string
  4. Metadata []byte
  5. }
  6. func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
  7. p.Name, err = pd.getString()
  8. if err != nil {
  9. return err
  10. }
  11. p.Metadata, err = pd.getBytes()
  12. return err
  13. }
  14. func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
  15. if err := pe.putString(p.Name); err != nil {
  16. return err
  17. }
  18. if err := pe.putBytes(p.Metadata); err != nil {
  19. return err
  20. }
  21. return nil
  22. }
  23. type JoinGroupRequest struct {
  24. Version int16
  25. GroupID string
  26. SessionTimeout int32
  27. RebalanceTimeout int32
  28. MemberID string
  29. ProtocolType string
  30. GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
  31. OrderedGroupProtocols []*GroupProtocol
  32. }
  33. func (r *JoinGroupRequest) encode(pe packetEncoder) error {
  34. if err := pe.putString(r.GroupID); err != nil {
  35. return err
  36. }
  37. pe.putInt32(r.SessionTimeout)
  38. if r.Version >= 1 {
  39. pe.putInt32(r.RebalanceTimeout)
  40. }
  41. if err := pe.putString(r.MemberID); err != nil {
  42. return err
  43. }
  44. if err := pe.putString(r.ProtocolType); err != nil {
  45. return err
  46. }
  47. if len(r.GroupProtocols) > 0 {
  48. if len(r.OrderedGroupProtocols) > 0 {
  49. return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
  50. }
  51. if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
  52. return err
  53. }
  54. for name, metadata := range r.GroupProtocols {
  55. if err := pe.putString(name); err != nil {
  56. return err
  57. }
  58. if err := pe.putBytes(metadata); err != nil {
  59. return err
  60. }
  61. }
  62. } else {
  63. if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
  64. return err
  65. }
  66. for _, protocol := range r.OrderedGroupProtocols {
  67. if err := protocol.encode(pe); err != nil {
  68. return err
  69. }
  70. }
  71. }
  72. return nil
  73. }
  74. func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
  75. r.Version = version
  76. if r.GroupID, err = pd.getString(); err != nil {
  77. return
  78. }
  79. if r.SessionTimeout, err = pd.getInt32(); err != nil {
  80. return
  81. }
  82. if version >= 1 {
  83. if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
  84. return err
  85. }
  86. }
  87. if r.MemberID, err = pd.getString(); err != nil {
  88. return
  89. }
  90. if r.ProtocolType, err = pd.getString(); err != nil {
  91. return
  92. }
  93. n, err := pd.getArrayLength()
  94. if err != nil {
  95. return err
  96. }
  97. if n == 0 {
  98. return nil
  99. }
  100. r.GroupProtocols = make(map[string][]byte)
  101. for i := 0; i < n; i++ {
  102. protocol := &GroupProtocol{}
  103. if err := protocol.decode(pd); err != nil {
  104. return err
  105. }
  106. r.GroupProtocols[protocol.Name] = protocol.Metadata
  107. r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
  108. }
  109. return nil
  110. }
  111. func (r *JoinGroupRequest) key() int16 {
  112. return 11
  113. }
  114. func (r *JoinGroupRequest) version() int16 {
  115. return r.Version
  116. }
  117. func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
  118. switch r.Version {
  119. case 2:
  120. return V0_11_0_0
  121. case 1:
  122. return V0_10_1_0
  123. default:
  124. return V0_9_0_0
  125. }
  126. }
  127. func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
  128. r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
  129. Name: name,
  130. Metadata: metadata,
  131. })
  132. }
  133. func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
  134. bin, err := encode(metadata, nil)
  135. if err != nil {
  136. return err
  137. }
  138. r.AddGroupProtocol(name, bin)
  139. return nil
  140. }