浏览代码

Merge pull request #86 from eapache/consumer-metadata

Implement consumer metadata request/response pair
Willem van Bergen 11 年之前
父节点
当前提交
da8e7da3d5
共有 6 个文件被更改,包括 153 次插入0 次删除
  1. 12 0
      broker.go
  2. 12 0
      broker_test.go
  3. 17 0
      consumer_metadata_request.go
  4. 19 0
      consumer_metadata_request_test.go
  5. 35 0
      consumer_metadata_response.go
  6. 58 0
      consumer_metadata_response_test.go

+ 12 - 0
broker.go

@@ -132,6 +132,18 @@ func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*Metada
 	return response, nil
 }
 
+func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
+	response := new(ConsumerMetadataResponse)
+
+	err := b.sendAndReceive(clientID, request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 

+ 12 - 0
broker_test.go

@@ -90,6 +90,18 @@ var brokerTestTable = []struct {
 			}
 		}},
 
+	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := ConsumerMetadataRequest{}
+			response, err := broker.GetConsumerMetadata("clientID", &request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("Consumer Metadata request got no response!")
+			}
+		}},
+
 	{[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}

+ 17 - 0
consumer_metadata_request.go

@@ -0,0 +1,17 @@
+package sarama
+
+type ConsumerMetadataRequest struct {
+	ConsumerGroup string
+}
+
+func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
+	return pe.putString(r.ConsumerGroup)
+}
+
+func (r *ConsumerMetadataRequest) key() int16 {
+	return 10
+}
+
+func (r *ConsumerMetadataRequest) version() int16 {
+	return 0
+}

+ 19 - 0
consumer_metadata_request_test.go

@@ -0,0 +1,19 @@
+package sarama
+
+import "testing"
+
+var (
+	consumerMetadataRequestEmpty = []byte{
+		0x00, 0x00}
+
+	consumerMetadataRequestString = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r'}
+)
+
+func TestConsumerMetadataRequest(t *testing.T) {
+	request := new(ConsumerMetadataRequest)
+	testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
+
+	request.ConsumerGroup = "foobar"
+	testEncodable(t, "with string", request, consumerMetadataRequestString)
+}

+ 35 - 0
consumer_metadata_response.go

@@ -0,0 +1,35 @@
+package sarama
+
+type ConsumerMetadataResponse struct {
+	Err             KError
+	CoordinatorId   int32
+	CoordinatorHost string
+	CoordinatorPort int32
+}
+
+func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	r.Err = KError(tmp)
+
+	if r.Err == NoError {
+		r.CoordinatorId, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+
+		r.CoordinatorHost, err = pd.getString()
+		if err != nil {
+			return err
+		}
+
+		r.CoordinatorPort, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 58 - 0
consumer_metadata_response_test.go

@@ -0,0 +1,58 @@
+package sarama
+
+import "testing"
+
+var (
+	consumerMetadataResponseError = []byte{
+		0x00, 0x0E}
+
+	consumerMetadataResponseSuccess = []byte{
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0xAB,
+		0x00, 0x03, 'f', 'o', 'o',
+		0x00, 0x00, 0xCC, 0xDD}
+)
+
+func TestConsumerMetadataResponseError(t *testing.T) {
+	response := ConsumerMetadataResponse{}
+
+	testDecodable(t, "error", &response, consumerMetadataResponseError)
+
+	if response.Err != OffsetsLoadInProgress {
+		t.Error("Decoding produced incorrect error value.")
+	}
+
+	if response.CoordinatorId != 0 {
+		t.Error("Decoding produced ID when the message contained an error.")
+	}
+
+	if len(response.CoordinatorHost) != 0 {
+		t.Error("Decoding produced host when the message contained an error.")
+	}
+
+	if response.CoordinatorPort != 0 {
+		t.Error("Decoding produced port when the message contained an error.")
+	}
+}
+
+func TestConsumerMetadataResponseSuccess(t *testing.T) {
+	response := ConsumerMetadataResponse{}
+
+	testDecodable(t, "success", &response, consumerMetadataResponseSuccess)
+
+	if response.Err != NoError {
+		t.Error("Decoding produced error value where there was none.")
+	}
+
+	if response.CoordinatorId != 0xAB {
+		t.Error("Decoding produced incorrect coordinator ID.")
+	}
+
+	if response.CoordinatorHost != "foo" {
+		t.Error("Decoding produced incorrect coordinator host.")
+	}
+
+	if response.CoordinatorPort != 0xCCDD {
+		t.Error("Decoding produced incorrect coordinator port.")
+	}
+}