Forráskód Böngészése

Implement consumer metadata request/response pair

Per latest version of the spec.
Evan Huus 11 éve
szülő
commit
9520ed2cbd

+ 17 - 0
consumer_metadata_request.go

@@ -0,0 +1,17 @@
+package sarama
+
+type ConsumerMetadataRequest struct {
+	ConsumerGroup string
+}
+
+func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
+	return pe.putString(r.ConsumerGroup)
+}
+
+func (r *ConsumerMetadataRequest) key() int16 {
+	return 10
+}
+
+func (r *ConsumerMetadataRequest) version() int16 {
+	return 0
+}

+ 19 - 0
consumer_metadata_request_test.go

@@ -0,0 +1,19 @@
+package sarama
+
+import "testing"
+
+var (
+	consumerMetadataRequestEmpty = []byte{
+		0x00, 0x00}
+
+	consumerMetadataRequestString = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r'}
+)
+
+func TestConsumerMetadataRequest(t *testing.T) {
+	request := new(ConsumerMetadataRequest)
+	testEncodable(t, "empty string", request, consumerMetadataRequestEmpty)
+
+	request.ConsumerGroup = "foobar"
+	testEncodable(t, "with string", request, consumerMetadataRequestString)
+}

+ 35 - 0
consumer_metadata_response.go

@@ -0,0 +1,35 @@
+package sarama
+
+type ConsumerMetadataResponse struct {
+	Err             KError
+	CoordinatorId   int32
+	CoordinatorHost string
+	CoordinatorPort int32
+}
+
+func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	r.Err = KError(tmp)
+
+	if r.Err == NoError {
+		r.CoordinatorId, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+
+		r.CoordinatorHost, err = pd.getString()
+		if err != nil {
+			return err
+		}
+
+		r.CoordinatorPort, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 58 - 0
consumer_metadata_response_test.go

@@ -0,0 +1,58 @@
+package sarama
+
+import "testing"
+
+var (
+	consumerMetadataResponseError = []byte{
+		0x00, 0x0E}
+
+	consumerMetadataResponseSuccess = []byte{
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0xAB,
+		0x00, 0x03, 'f', 'o', 'o',
+		0x00, 0x00, 0xCC, 0xDD}
+)
+
+func TestConsumerMetadataResponseError(t *testing.T) {
+	response := ConsumerMetadataResponse{}
+
+	testDecodable(t, "error", &response, consumerMetadataResponseError)
+
+	if response.Err != OffsetsLoadInProgress {
+		t.Error("Decoding produced incorrect error value.")
+	}
+
+	if response.CoordinatorId != 0 {
+		t.Error("Decoding produced ID when the message contained an error.")
+	}
+
+	if len(response.CoordinatorHost) != 0 {
+		t.Error("Decoding produced host when the message contained an error.")
+	}
+
+	if response.CoordinatorPort != 0 {
+		t.Error("Decoding produced port when the message contained an error.")
+	}
+}
+
+func TestConsumerMetadataResponseSuccess(t *testing.T) {
+	response := ConsumerMetadataResponse{}
+
+	testDecodable(t, "success", &response, consumerMetadataResponseSuccess)
+
+	if response.Err != NoError {
+		t.Error("Decoding produced error value where there was none.")
+	}
+
+	if response.CoordinatorId != 0xAB {
+		t.Error("Decoding produced incorrect coordinator ID.")
+	}
+
+	if response.CoordinatorHost != "foo" {
+		t.Error("Decoding produced incorrect coordinator host.")
+	}
+
+	if response.CoordinatorPort != 0xCCDD {
+		t.Error("Decoding produced incorrect coordinator port.")
+	}
+}