Bläddra i källkod

Automatically construct a broker from the ConsumerMetadataResponse

Evan Huus 10 år sedan
förälder
incheckning
b0438f260a
1 ändrade filer med 18 tillägg och 8 borttagningar
  1. 18 8
      consumer_metadata_response.go

+ 18 - 8
consumer_metadata_response.go

@@ -1,10 +1,16 @@
 package sarama
 
+import (
+	"net"
+	"strconv"
+)
+
 type ConsumerMetadataResponse struct {
 	Err             KError
-	CoordinatorID   int32
-	CoordinatorHost string
-	CoordinatorPort int32
+	Coordinator     *Broker
+	CoordinatorID   int32  // deprecated: use Coordinator.ID()
+	CoordinatorHost string // deprecated: use Coordinator.Addr()
+	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
 }
 
 func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
@@ -14,20 +20,24 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
 	}
 	r.Err = KError(tmp)
 
-	r.CoordinatorID, err = pd.getInt32()
-	if err != nil {
+	r.Coordinator = new(Broker)
+	if err := r.Coordinator.decode(pd); err != nil {
 		return err
 	}
 
-	r.CoordinatorHost, err = pd.getString()
+	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
+	// backwards compatibility
+	host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
 	if err != nil {
 		return err
 	}
-
-	r.CoordinatorPort, err = pd.getInt32()
+	port, err := strconv.ParseInt(portstr, 10, 32)
 	if err != nil {
 		return err
 	}
+	r.CoordinatorID = r.Coordinator.ID()
+	r.CoordinatorHost = host
+	r.CoordinatorPort = int32(port)
 
 	return nil
 }