12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- package sarama
- import (
- "net"
- "strconv"
- )
- //ConsumerMetadataResponse holds the response for a consumer group meta data requests
- type ConsumerMetadataResponse struct {
- Err KError
- Coordinator *Broker
- CoordinatorID int32 // deprecated: use Coordinator.ID()
- CoordinatorHost string // deprecated: use Coordinator.Addr()
- CoordinatorPort int32 // deprecated: use Coordinator.Addr()
- }
- func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
- tmp := new(FindCoordinatorResponse)
- if err := tmp.decode(pd, version); err != nil {
- return err
- }
- r.Err = tmp.Err
- r.Coordinator = tmp.Coordinator
- if tmp.Coordinator == nil {
- return nil
- }
- // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
- // backwards compatibility
- host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
- if err != nil {
- return err
- }
- port, err := strconv.ParseInt(portstr, 10, 32)
- if err != nil {
- return err
- }
- r.CoordinatorID = r.Coordinator.ID()
- r.CoordinatorHost = host
- r.CoordinatorPort = int32(port)
- return nil
- }
- func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
- if r.Coordinator == nil {
- r.Coordinator = new(Broker)
- r.Coordinator.id = r.CoordinatorID
- r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
- }
- tmp := &FindCoordinatorResponse{
- Version: 0,
- Err: r.Err,
- Coordinator: r.Coordinator,
- }
- if err := tmp.encode(pe); err != nil {
- return err
- }
- return nil
- }
- func (r *ConsumerMetadataResponse) key() int16 {
- return 10
- }
- func (r *ConsumerMetadataResponse) version() int16 {
- return 0
- }
- func (r *ConsumerMetadataResponse) headerVersion() int16 {
- return 0
- }
- func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
- return V0_8_2_0
- }
|