123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package sarama
- type GroupProtocol struct {
- Name string
- Metadata []byte
- }
- func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
- p.Name, err = pd.getString()
- if err != nil {
- return err
- }
- p.Metadata, err = pd.getBytes()
- return err
- }
- func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
- if err := pe.putString(p.Name); err != nil {
- return err
- }
- if err := pe.putBytes(p.Metadata); err != nil {
- return err
- }
- return nil
- }
- type JoinGroupRequest struct {
- Version int16
- GroupId string
- SessionTimeout int32
- RebalanceTimeout int32
- MemberId string
- ProtocolType string
- GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols
- OrderedGroupProtocols []*GroupProtocol
- }
- func (r *JoinGroupRequest) encode(pe packetEncoder) error {
- if err := pe.putString(r.GroupId); err != nil {
- return err
- }
- pe.putInt32(r.SessionTimeout)
- if r.Version >= 1 {
- pe.putInt32(r.RebalanceTimeout)
- }
- if err := pe.putString(r.MemberId); err != nil {
- return err
- }
- if err := pe.putString(r.ProtocolType); err != nil {
- return err
- }
- if len(r.GroupProtocols) > 0 {
- if len(r.OrderedGroupProtocols) > 0 {
- return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
- }
- if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
- return err
- }
- for name, metadata := range r.GroupProtocols {
- if err := pe.putString(name); err != nil {
- return err
- }
- if err := pe.putBytes(metadata); err != nil {
- return err
- }
- }
- } else {
- if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
- return err
- }
- for _, protocol := range r.OrderedGroupProtocols {
- if err := protocol.encode(pe); err != nil {
- return err
- }
- }
- }
- return nil
- }
- func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
- r.Version = version
- if r.GroupId, err = pd.getString(); err != nil {
- return
- }
- if r.SessionTimeout, err = pd.getInt32(); err != nil {
- return
- }
- if version >= 1 {
- if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
- return err
- }
- }
- if r.MemberId, err = pd.getString(); err != nil {
- return
- }
- if r.ProtocolType, err = pd.getString(); err != nil {
- return
- }
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- if n == 0 {
- return nil
- }
- r.GroupProtocols = make(map[string][]byte)
- for i := 0; i < n; i++ {
- protocol := &GroupProtocol{}
- if err := protocol.decode(pd); err != nil {
- return err
- }
- r.GroupProtocols[protocol.Name] = protocol.Metadata
- r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
- }
- return nil
- }
- func (r *JoinGroupRequest) key() int16 {
- return 11
- }
- func (r *JoinGroupRequest) version() int16 {
- return r.Version
- }
- func (r *JoinGroupRequest) headerVersion() int16 {
- return 1
- }
- func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
- switch r.Version {
- case 2:
- return V0_11_0_0
- case 1:
- return V0_10_1_0
- default:
- return V0_9_0_0
- }
- }
- func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
- r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
- Name: name,
- Metadata: metadata,
- })
- }
- func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
- bin, err := encode(metadata, nil)
- if err != nil {
- return err
- }
- r.AddGroupProtocol(name, bin)
- return nil
- }
|