Browse Source

Merge pull request #503 from mailgun/maxim/moremocks

Add mock responses for OffsetManager testing
Evan Huus 10 years ago
parent
commit
ba01d5073f

+ 22 - 7
consumer_metadata_response.go

@@ -20,10 +20,14 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
 	}
 	r.Err = KError(tmp)
 
-	r.Coordinator = new(Broker)
-	if err := r.Coordinator.decode(pd); err != nil {
+	coordinator := new(Broker)
+	if err := coordinator.decode(pd); err != nil {
 		return err
 	}
+	if coordinator.addr == ":0" {
+		return nil
+	}
+	r.Coordinator = coordinator
 
 	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
 	// backwards compatibility
@@ -43,16 +47,27 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
 }
 
 func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
-
 	pe.putInt16(int16(r.Err))
-
+	if r.Coordinator != nil {
+		host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
+		if err != nil {
+			return err
+		}
+		port, err := strconv.ParseInt(portstr, 10, 32)
+		if err != nil {
+			return err
+		}
+		pe.putInt32(r.Coordinator.ID())
+		if err := pe.putString(host); err != nil {
+			return err
+		}
+		pe.putInt32(int32(port))
+		return nil
+	}
 	pe.putInt32(r.CoordinatorID)
-
 	if err := pe.putString(r.CoordinatorHost); err != nil {
 		return err
 	}
-
 	pe.putInt32(r.CoordinatorPort)
-
 	return nil
 }

+ 12 - 46
consumer_metadata_response_test.go

@@ -17,53 +17,19 @@ var (
 )
 
 func TestConsumerMetadataResponseError(t *testing.T) {
-	response := ConsumerMetadataResponse{}
-
-	testDecodable(t, "error", &response, consumerMetadataResponseError)
-
-	if response.Err != ErrOffsetsLoadInProgress {
-		t.Error("Decoding produced incorrect error value.")
-	}
-
-	if response.CoordinatorID != 0 {
-		t.Error("Decoding produced incorrect ID.")
-	}
-
-	if len(response.CoordinatorHost) != 0 {
-		t.Error("Decoding produced incorrect host.")
-	}
-
-	if response.CoordinatorPort != 0 {
-		t.Error("Decoding produced incorrect port.")
-	}
+	response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
+	testResponse(t, "error", &response, consumerMetadataResponseError)
 }
 
 func TestConsumerMetadataResponseSuccess(t *testing.T) {
-	response := ConsumerMetadataResponse{}
-
-	testDecodable(t, "success", &response, consumerMetadataResponseSuccess)
-
-	if response.Err != ErrNoError {
-		t.Error("Decoding produced error value where there was none.")
-	}
-
-	if response.CoordinatorID != 0xAB {
-		t.Error("Decoding produced incorrect coordinator ID.")
-	}
-
-	if response.CoordinatorHost != "foo" {
-		t.Error("Decoding produced incorrect coordinator host.")
-	}
-
-	if response.CoordinatorPort != 0xCCDD {
-		t.Error("Decoding produced incorrect coordinator port.")
-	}
-
-	if response.Coordinator.ID() != 0xAB {
-		t.Error("Decoding produced incorrect coordinator ID.")
-	}
-
-	if response.Coordinator.Addr() != "foo:52445" {
-		t.Error("Decoding produced incorrect coordinator address.")
-	}
+	broker := NewBroker("foo:52445")
+	broker.id = 0xAB
+	response := ConsumerMetadataResponse{
+		Coordinator:     broker,
+		CoordinatorID:   0xAB,
+		CoordinatorHost: "foo",
+		CoordinatorPort: 0xCCDD,
+		Err:             ErrNoError,
+	}
+	testResponse(t, "success", &response, consumerMetadataResponseSuccess)
 }

+ 133 - 0
mockresponses_test.go

@@ -230,3 +230,136 @@ func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) in
 	}
 	return partitions[partition]
 }
+
+// mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
+type mockConsumerMetadataResponse struct {
+	coordinators map[string]interface{}
+	t            *testing.T
+}
+
+func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
+	return &mockConsumerMetadataResponse{
+		coordinators: make(map[string]interface{}),
+		t:            t,
+	}
+}
+
+func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
+	mr.coordinators[group] = broker
+	return mr
+}
+
+func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
+	mr.coordinators[group] = kerror
+	return mr
+}
+
+func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
+	req := reqBody.(*ConsumerMetadataRequest)
+	group := req.ConsumerGroup
+	res := &ConsumerMetadataResponse{}
+	v := mr.coordinators[group]
+	switch v := v.(type) {
+	case *mockBroker:
+		res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
+	case KError:
+		res.Err = v
+	}
+	return res
+}
+
+// mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
+type mockOffsetCommitResponse struct {
+	errors map[string]map[string]map[int32]KError
+	t      *testing.T
+}
+
+func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
+	return &mockOffsetCommitResponse{t: t}
+}
+
+func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
+	if mr.errors == nil {
+		mr.errors = make(map[string]map[string]map[int32]KError)
+	}
+	topics := mr.errors[group]
+	if topics == nil {
+		topics = make(map[string]map[int32]KError)
+		mr.errors[group] = topics
+	}
+	partitions := topics[topic]
+	if partitions == nil {
+		partitions = make(map[int32]KError)
+		topics[topic] = partitions
+	}
+	partitions[partition] = kerror
+	return mr
+}
+
+func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
+	req := reqBody.(*OffsetCommitRequest)
+	group := req.ConsumerGroup
+	res := &OffsetCommitResponse{}
+	for topic, partitions := range req.blocks {
+		for partition := range partitions {
+			res.AddError(topic, partition, mr.getError(group, topic, partition))
+		}
+	}
+	return res
+}
+
+func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
+	topics := mr.errors[group]
+	if topics == nil {
+		return ErrNoError
+	}
+	partitions := topics[topic]
+	if partitions == nil {
+		return ErrNoError
+	}
+	kerror, ok := partitions[partition]
+	if !ok {
+		return ErrNoError
+	}
+	return kerror
+}
+
+// mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
+type mockOffsetFetchResponse struct {
+	offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
+	t       *testing.T
+}
+
+func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
+	return &mockOffsetFetchResponse{t: t}
+}
+
+func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
+	if mr.offsets == nil {
+		mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
+	}
+	topics := mr.offsets[group]
+	if topics == nil {
+		topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
+		mr.offsets[group] = topics
+	}
+	partitions := topics[topic]
+	if partitions == nil {
+		partitions = make(map[int32]*OffsetFetchResponseBlock)
+		topics[topic] = partitions
+	}
+	partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
+	return mr
+}
+
+func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
+	req := reqBody.(*OffsetFetchRequest)
+	group := req.ConsumerGroup
+	res := &OffsetFetchResponse{}
+	for topic, partitions := range mr.offsets[group] {
+		for partition, block := range partitions {
+			res.AddBlock(topic, partition, block)
+		}
+	}
+	return res
+}

+ 32 - 1
offset_commit_response.go

@@ -4,9 +4,40 @@ type OffsetCommitResponse struct {
 	Errors map[string]map[int32]KError
 }
 
+func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
+	if r.Errors == nil {
+		r.Errors = make(map[string]map[int32]KError)
+	}
+	partitions := r.Errors[topic]
+	if partitions == nil {
+		partitions = make(map[int32]KError)
+		r.Errors[topic] = partitions
+	}
+	partitions[partition] = kerror
+}
+
+func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(r.Errors)); err != nil {
+		return err
+	}
+	for topic, partitions := range r.Errors {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putArrayLength(len(partitions)); err != nil {
+			return err
+		}
+		for partition, kerror := range partitions {
+			pe.putInt32(partition)
+			pe.putInt16(int16(kerror))
+		}
+	}
+	return nil
+}
+
 func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
 	numTopics, err := pd.getArrayLength()
-	if err != nil {
+	if err != nil || numTopics == 0 {
 		return err
 	}
 

+ 9 - 37
offset_commit_response_test.go

@@ -1,52 +1,24 @@
 package sarama
 
-import "testing"
+import (
+	"testing"
+)
 
 var (
 	emptyOffsetCommitResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
-
-	normalOffsetCommitResponse = []byte{
-		0x00, 0x00, 0x00, 0x02,
-
-		0x00, 0x01, 'm',
-		0x00, 0x00, 0x00, 0x00,
-
-		0x00, 0x01, 't',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x06}
 )
 
 func TestEmptyOffsetCommitResponse(t *testing.T) {
 	response := OffsetCommitResponse{}
-
-	testDecodable(t, "empty", &response, emptyOffsetCommitResponse)
-
-	if len(response.Errors) != 0 {
-		t.Error("Decoding produced errors where there were none.")
-	}
+	testResponse(t, "empty", &response, emptyOffsetCommitResponse)
 }
 
 func TestNormalOffsetCommitResponse(t *testing.T) {
 	response := OffsetCommitResponse{}
-
-	testDecodable(t, "normal", &response, normalOffsetCommitResponse)
-
-	if len(response.Errors) != 2 {
-		t.Fatal("Decoding produced wrong number of errors.")
-	}
-
-	if len(response.Errors["m"]) != 0 {
-		t.Error("Decoding produced errors for topic 'm' where there were none.")
-	}
-
-	if len(response.Errors["t"]) != 1 {
-		t.Fatal("Decoding produced wrong number of errors for topic 't'.")
-	}
-
-	if response.Errors["t"][0] != ErrNotLeaderForPartition {
-		t.Error("Decoding produced wrong error for topic 't' partition 0.")
-	}
-
+	response.AddError("t", 0, ErrNotLeaderForPartition)
+	response.Errors["m"] = make(map[int32]KError)
+	// The response encoded form cannot be checked for it varies due to
+	// unpredictable map traversal order.
+	testResponse(t, "normal", &response, nil)
 }

+ 50 - 1
offset_fetch_response.go

@@ -43,9 +43,30 @@ type OffsetFetchResponse struct {
 	Blocks map[string]map[int32]*OffsetFetchResponseBlock
 }
 
+func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
+		return err
+	}
+	for topic, partitions := range r.Blocks {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putArrayLength(len(partitions)); err != nil {
+			return err
+		}
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			if err := block.encode(pe); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
 func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 	numTopics, err := pd.getArrayLength()
-	if err != nil {
+	if err != nil || numTopics == 0 {
 		return err
 	}
 
@@ -61,6 +82,10 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 			return err
 		}
 
+		if numBlocks == 0 {
+			r.Blocks[name] = nil
+			continue
+		}
 		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
@@ -80,3 +105,27 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
+	if r.Blocks == nil {
+		return nil
+	}
+
+	if r.Blocks[topic] == nil {
+		return nil
+	}
+
+	return r.Blocks[topic][partition]
+}
+
+func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
+	}
+	partitions := r.Blocks[topic]
+	if partitions == nil {
+		partitions = make(map[int32]*OffsetFetchResponseBlock)
+		r.Blocks[topic] = partitions
+	}
+	partitions[partition] = block
+}

+ 6 - 45
offset_fetch_response_test.go

@@ -5,57 +5,18 @@ import "testing"
 var (
 	emptyOffsetFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
-
-	normalOffsetFetchResponse = []byte{
-		0x00, 0x00, 0x00, 0x02,
-
-		0x00, 0x01, 'm',
-		0x00, 0x00, 0x00, 0x00,
-
-		0x00, 0x01, 't',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-		0x00, 0x02, 'm', 'd',
-		0x00, 0x07}
 )
 
 func TestEmptyOffsetFetchResponse(t *testing.T) {
 	response := OffsetFetchResponse{}
-
-	testDecodable(t, "empty", &response, emptyOffsetFetchResponse)
-
-	if len(response.Blocks) != 0 {
-		t.Error("Decoding produced topic blocks where there were none.")
-	}
+	testResponse(t, "empty", &response, emptyOffsetFetchResponse)
 }
 
 func TestNormalOffsetFetchResponse(t *testing.T) {
 	response := OffsetFetchResponse{}
-
-	testDecodable(t, "normal", &response, normalOffsetFetchResponse)
-
-	if len(response.Blocks) != 2 {
-		t.Fatal("Decoding produced wrong number of blocks.")
-	}
-
-	if len(response.Blocks["m"]) != 0 {
-		t.Error("Decoding produced partitions for topic 'm' where there were none.")
-	}
-
-	if len(response.Blocks["t"]) != 1 {
-		t.Fatal("Decoding produced wrong number of blocks for topic 't'.")
-	}
-
-	if response.Blocks["t"][0].Offset != 0 {
-		t.Error("Decoding produced wrong offset for topic 't' partition 0.")
-	}
-
-	if response.Blocks["t"][0].Metadata != "md" {
-		t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
-	}
-
-	if response.Blocks["t"][0].Err != ErrRequestTimedOut {
-		t.Error("Decoding produced wrong error for topic 't' partition 0.")
-	}
+	response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
+	response.Blocks["m"] = nil
+	// The response encoded form cannot be checked for it varies due to
+	// unpredictable map traversal order.
+	testResponse(t, "normal", &response, nil)
 }

+ 19 - 1
request_test.go

@@ -57,6 +57,24 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) {
 	} else if decoded.correlationID != 123 || decoded.clientID != "foo" {
 		t.Errorf("Decoded header is not valid: %v", decoded)
 	} else if !reflect.DeepEqual(rb, decoded.body) {
-		t.Errorf("Decoded request does not match the encoded one\n    encoded: %v\n    decoded: %v", rb, decoded)
+		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded)
+	}
+}
+
+func testResponse(t *testing.T, name string, res encoder, expected []byte) {
+	encoded, err := encode(res)
+	if err != nil {
+		t.Error(err)
+	} else if expected != nil && !bytes.Equal(encoded, expected) {
+		t.Error("Encoding", name, "failed\ngot ", encoded, "\nwant", expected)
+	}
+
+	decoded := reflect.New(reflect.TypeOf(res).Elem()).Interface().(decoder)
+	if err := decode(encoded, decoded); err != nil {
+		t.Error("Decoding", name, "failed:", err)
+	}
+
+	if !reflect.DeepEqual(decoded, res) {
+		t.Errorf("Decoded response does not match the encoded one\nencoded: %#v\ndecoded: %#v", res, decoded)
 	}
 }