@@ -132,6 +132,18 @@ func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*Metada
return response, nil
}
+func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
+ response := new(ConsumerMetadataResponse)
+
+ err := b.sendAndReceive(clientID, request, response)
+ if err != nil {
+ return nil, err
+ }
+ return response, nil
+}
func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)
@@ -90,6 +90,18 @@ var brokerTestTable = []struct {
}},
+ {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
+ func(t *testing.T, broker *Broker) {
+ request := ConsumerMetadataRequest{}
+ response, err := broker.GetConsumerMetadata("clientID", &request)
+ t.Error(err)
+ if response == nil {
+ t.Error("Consumer Metadata request got no response!")
+ }},
{[]byte{},
func(t *testing.T, broker *Broker) {
request := ProduceRequest{}