Browse Source

Added version to broker.encode()

Mickael Maison 7 years ago
parent
commit
2e5395ba98
3 changed files with 11 additions and 4 deletions
  1. 8 1
      broker.go
  2. 2 2
      find_coordinator_response.go
  3. 1 1
      metadata_response.go

+ 8 - 1
broker.go

@@ -624,7 +624,7 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
-func (b *Broker) encode(pe packetEncoder) (err error) {
+func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
 
 	host, portstr, err := net.SplitHostPort(b.addr)
 	if err != nil {
@@ -644,6 +644,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 
 	pe.putInt32(int32(port))
 
+	if version >= 1 {
+		err = pe.putNullableString(b.rack)
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 

+ 2 - 2
find_coordinator_response.go

@@ -36,7 +36,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
 	}
 
 	coordinator := new(Broker)
-	if err := coordinator.decode(pd); err != nil {
+	if err := coordinator.decode(pd, 0); err != nil {
 		return err
 	}
 	if coordinator.addr == ":0" {
@@ -60,7 +60,7 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
 		}
 	}
 
-	if err := f.Coordinator.encode(pe); err != nil {
+	if err := f.Coordinator.encode(pe, 0); err != nil {
 		return err
 	}
 

+ 1 - 1
metadata_response.go

@@ -179,7 +179,7 @@ func (r *MetadataResponse) encode(pe packetEncoder) error {
 		return err
 	}
 	for _, broker := range r.Brokers {
-		err = broker.encode(pe)
+		err = broker.encode(pe, r.Version)
 		if err != nil {
 			return err
 		}