Browse Source

Merge pull request #1174 from birdayz/fix_metadata_response_encode

fix encoding of MetadataResponse with version >= 2
Vlad Gorodetsky 7 years ago
parent
commit
434e22daac
2 changed files with 12 additions and 2 deletions
  1. 11 0
      metadata_response.go
  2. 1 2
      mocks/async_producer.go

+ 11 - 0
metadata_response.go

@@ -207,6 +207,10 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
 }
 }
 
 
 func (r *MetadataResponse) encode(pe packetEncoder) error {
 func (r *MetadataResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
+
 	err := pe.putArrayLength(len(r.Brokers))
 	err := pe.putArrayLength(len(r.Brokers))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -218,6 +222,13 @@ func (r *MetadataResponse) encode(pe packetEncoder) error {
 		}
 		}
 	}
 	}
 
 
+	if r.Version >= 2 {
+		err := pe.putNullableString(r.ClusterID)
+		if err != nil {
+			return err
+		}
+	}
+
 	if r.Version >= 1 {
 	if r.Version >= 1 {
 		pe.putInt32(r.ControllerID)
 		pe.putInt32(r.ControllerID)
 	}
 	}

+ 1 - 2
mocks/async_producer.go

@@ -44,6 +44,7 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
 		defer func() {
 		defer func() {
 			close(mp.successes)
 			close(mp.successes)
 			close(mp.errors)
 			close(mp.errors)
+			close(mp.closed)
 		}()
 		}()
 
 
 		for msg := range mp.input {
 		for msg := range mp.input {
@@ -86,8 +87,6 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
 			mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
 			mp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(mp.expectations))
 		}
 		}
 		mp.l.Unlock()
 		mp.l.Unlock()
-
-		close(mp.closed)
 	}()
 	}()
 
 
 	return mp
 	return mp