|
|
@@ -1,10 +1,16 @@
|
|
|
package sarama
|
|
|
|
|
|
+import (
|
|
|
+ "net"
|
|
|
+ "strconv"
|
|
|
+)
|
|
|
+
|
|
|
type ConsumerMetadataResponse struct {
|
|
|
Err KError
|
|
|
- CoordinatorID int32
|
|
|
- CoordinatorHost string
|
|
|
- CoordinatorPort int32
|
|
|
+ Coordinator *Broker
|
|
|
+ CoordinatorID int32 // deprecated: use Coordinator.ID()
|
|
|
+ CoordinatorHost string // deprecated: use Coordinator.Addr()
|
|
|
+ CoordinatorPort int32 // deprecated: use Coordinator.Addr()
|
|
|
}
|
|
|
|
|
|
func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
|
|
|
@@ -14,20 +20,24 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
|
|
|
}
|
|
|
r.Err = KError(tmp)
|
|
|
|
|
|
- r.CoordinatorID, err = pd.getInt32()
|
|
|
- if err != nil {
|
|
|
+ r.Coordinator = new(Broker)
|
|
|
+ if err := r.Coordinator.decode(pd); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- r.CoordinatorHost, err = pd.getString()
|
|
|
+ // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
|
|
|
+ // backwards compatibility
|
|
|
+ host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
-
|
|
|
- r.CoordinatorPort, err = pd.getInt32()
|
|
|
+ port, err := strconv.ParseInt(portstr, 10, 32)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ r.CoordinatorID = r.Coordinator.ID()
|
|
|
+ r.CoordinatorHost = host
|
|
|
+ r.CoordinatorPort = int32(port)
|
|
|
|
|
|
return nil
|
|
|
}
|