|
|
@@ -1,11 +1,36 @@
|
|
|
package sarama
|
|
|
|
|
|
+type GroupProtocol struct {
|
|
|
+ Name string
|
|
|
+ Metadata []byte
|
|
|
+}
|
|
|
+
|
|
|
+func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
|
|
|
+ p.Name, err = pd.getString()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ p.Metadata, err = pd.getBytes()
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
|
|
|
+ if err := pe.putString(p.Name); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := pe.putBytes(p.Metadata); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
type JoinGroupRequest struct {
|
|
|
- GroupId string
|
|
|
- SessionTimeout int32
|
|
|
- MemberId string
|
|
|
- ProtocolType string
|
|
|
- GroupProtocols map[string][]byte
|
|
|
+ GroupId string
|
|
|
+ SessionTimeout int32
|
|
|
+ MemberId string
|
|
|
+ ProtocolType string
|
|
|
+ GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
|
|
|
+ OrderedGroupProtocols []*GroupProtocol
|
|
|
}
|
|
|
|
|
|
func (r *JoinGroupRequest) encode(pe packetEncoder) error {
|
|
|
@@ -20,16 +45,31 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- for name, metadata := range r.GroupProtocols {
|
|
|
- if err := pe.putString(name); err != nil {
|
|
|
+ if len(r.GroupProtocols) > 0 {
|
|
|
+ if len(r.OrderedGroupProtocols) > 0 {
|
|
|
+ return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- if err := pe.putBytes(metadata); err != nil {
|
|
|
+ for name, metadata := range r.GroupProtocols {
|
|
|
+ if err := pe.putString(name); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := pe.putBytes(metadata); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ for _, protocol := range r.OrderedGroupProtocols {
|
|
|
+ if err := protocol.encode(pe); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -62,16 +102,12 @@ func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
|
|
|
|
|
|
r.GroupProtocols = make(map[string][]byte)
|
|
|
for i := 0; i < n; i++ {
|
|
|
- name, err := pd.getString()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- metadata, err := pd.getBytes()
|
|
|
- if err != nil {
|
|
|
+ protocol := &GroupProtocol{}
|
|
|
+ if err := protocol.decode(pd); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
-
|
|
|
- r.GroupProtocols[name] = metadata
|
|
|
+ r.GroupProtocols[protocol.Name] = protocol.Metadata
|
|
|
+ r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -90,11 +126,10 @@ func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
|
|
|
}
|
|
|
|
|
|
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
|
|
|
- if r.GroupProtocols == nil {
|
|
|
- r.GroupProtocols = make(map[string][]byte)
|
|
|
- }
|
|
|
-
|
|
|
- r.GroupProtocols[name] = metadata
|
|
|
+ r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
|
|
|
+ Name: name,
|
|
|
+ Metadata: metadata,
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
|