Prechádzať zdrojové kódy

Translated more tests to new mockbroker format

Burke Libbey 12 rokov pred
rodič
commit
86e0c02890

+ 1 - 0
.gitignore

@@ -2,6 +2,7 @@
 *.o
 *.a
 *.so
+*.test
 
 # Folders
 _obj

+ 43 - 76
consumer_test.go

@@ -1,8 +1,8 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 	"time"
 )
@@ -35,44 +35,20 @@ var (
 )
 
 func TestSimpleConsumer(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x1C,
-				// messageSet
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x10,
-				// message
-				0x23, 0x96, 0x4a, 0xf7, // CRC
-				0x00,
-				0x00,
-				0xFF, 0xFF, 0xFF, 0xFF,
-				0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
-			binary.BigEndian.PutUint64(msg[36:], uint64(i))
-			extraResponses <- msg
-		}
-		extraResponses <- consumerStopper
-	}()
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	for i := 0; i < 10; i++ {
+		mb2.ExpectFetchRequest().
+			AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), uint64(i))
+	}
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -83,6 +59,8 @@ func TestSimpleConsumer(t *testing.T) {
 		t.Fatal(err)
 	}
 	defer consumer.Close()
+	defer mb1.Close()
+	defer mb2.Close()
 
 	for i := 0; i < 10; i++ {
 		event := <-consumer.Events()
@@ -93,24 +71,19 @@ func TestSimpleConsumer(t *testing.T) {
 			t.Error("Incorrect message offset!")
 		}
 	}
+
 }
 
 func TestConsumerRawOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- consumerStopper
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -122,36 +95,27 @@ func TestConsumerRawOffset(t *testing.T) {
 	}
 	defer consumer.Close()
 
+	defer mb1.Close()
+	defer mb2.Close()
+
 	if consumer.offset != 1234 {
 		t.Error("Raw offset not set correctly")
 	}
 }
 
 func TestConsumerLatestOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01,
-	}
-	extraResponses <- consumerStopper
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	mb2.ExpectOffsetFetchRequest().
+		AddTopicPartition("my_topic", 0, 0x010101)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -163,6 +127,9 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 	defer consumer.Close()
 
+	defer mb2.Close()
+	defer mb1.Close()
+
 	if consumer.offset != 0x010101 {
 		t.Error("Latest offset not fetched correctly")
 	}

+ 135 - 0
mockbroker/fetch_request_expectation.go

@@ -0,0 +1,135 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+	"hash/crc32"
+)
+
+type FetchRequestExpectation struct {
+	messages []fetchResponseMessage
+}
+
+type encoder interface {
+	Encode() ([]byte, error)
+}
+
+type fetchResponseMessage struct {
+	topic      string
+	partition  int32
+	key, value encoder
+	offset     uint64
+}
+
+func (e *FetchRequestExpectation) AddMessage(
+	topic string, partition int32, key, value encoder, offset uint64,
+) *FetchRequestExpectation {
+	e.messages = append(e.messages, fetchResponseMessage{
+		topic:     topic,
+		partition: partition,
+		key:       key,
+		value:     value,
+		offset:    offset,
+	})
+	return e
+}
+
+func (b *MockBroker) ExpectFetchRequest() *FetchRequestExpectation {
+	e := &FetchRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *FetchRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]fetchResponseMessage)
+	for _, frm := range e.messages {
+		byTopic[frm.topic] = append(byTopic[frm.topic], frm)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, messages := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic))
+
+		byPartition := make(map[int32][]fetchResponseMessage)
+		for _, frm := range messages {
+			byPartition[frm.partition] = append(byPartition[frm.partition], frm)
+		}
+
+		binary.Write(buf, binary.BigEndian, uint32(len(byPartition)))
+
+		for partition, messages := range byPartition {
+			binary.Write(buf, binary.BigEndian, uint32(partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // error
+			binary.Write(buf, binary.BigEndian, uint64(0)) // high water mark offset
+
+			messageSetBuffer := new(bytes.Buffer)
+
+			var maxOffset uint64
+
+			for _, msg := range messages {
+				chunk := new(bytes.Buffer)
+
+				binary.Write(chunk, binary.BigEndian, uint8(0)) // format
+				binary.Write(chunk, binary.BigEndian, uint8(0)) // attribute
+
+				if msg.offset > maxOffset {
+					maxOffset = msg.offset
+				}
+
+				if msg.key == nil {
+					binary.Write(chunk, binary.BigEndian, int32(-1))
+				} else {
+					bytes, _ := msg.key.Encode()
+					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
+					chunk.Write(bytes)
+				}
+
+				if msg.value == nil {
+					binary.Write(chunk, binary.BigEndian, int32(-1))
+				} else {
+					bytes, _ := msg.value.Encode()
+					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
+					chunk.Write(bytes)
+				}
+
+				cksum := crc32.ChecksumIEEE(chunk.Bytes())
+				length := len(chunk.Bytes()) + 4
+
+				binary.Write(messageSetBuffer, binary.BigEndian, uint32(length)) // message length
+				binary.Write(messageSetBuffer, binary.BigEndian, uint32(cksum))  // CRC
+				messageSetBuffer.Write(chunk.Bytes())
+			}
+
+			binary.Write(buf, binary.BigEndian, uint32(len(messageSetBuffer.Bytes())+8)) // msgSet size
+			binary.Write(buf, binary.BigEndian, uint64(maxOffset))                       // offset
+			buf.Write(messageSetBuffer.Bytes())
+
+		}
+
+	}
+
+	/*
+		sample response:
+
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
+		// messageSet
+		0x00, 0x00, 0x00, 0x1C, // messageset size
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
+		// message
+		0x00, 0x00, 0x00, 0x10, // length of message (?)
+		0x23, 0x96, 0x4a, 0xf7, // CRC32
+		0x00, // format
+		0x00, // attribute (compression)
+		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
+	*/
+	return buf.Bytes()
+}

+ 37 - 0
mockbroker/fetch_request_expectation_test.go

@@ -0,0 +1,37 @@
+package mockbroker
+
+import (
+	"bytes"
+	"github.com/shopify/sarama"
+	"testing"
+)
+
+func TestFetchRequestSerialization(t *testing.T) {
+
+	exp := new(FetchRequestExpectation).
+		AddMessage("my_topic", 0, nil, sarama.ByteEncoder([]byte{0x00, 0xEE}), 3)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
+		// messageSet
+		0x00, 0x00, 0x00, 0x1C, // messageset size
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // offset
+		// message
+		0x00, 0x00, 0x00, 0x10, // length of message (?)
+		0x23, 0x96, 0x4a, 0xf7, // CRC32
+		0x00,                   // format
+		0x00,                   // attribute (compression)
+		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}

+ 65 - 0
mockbroker/offset_fetch_request_expectation.go

@@ -0,0 +1,65 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type OffsetFetchRequestExpectation struct {
+	topicPartitions []offsetFetchRequestTP
+}
+
+type offsetFetchRequestTP struct {
+	topic     string
+	partition int32
+	offset    uint64
+}
+
+func (e *OffsetFetchRequestExpectation) AddTopicPartition(
+	topic string, partition int32, offset uint64,
+) *OffsetFetchRequestExpectation {
+	ofrtp := offsetFetchRequestTP{topic, partition, offset}
+	e.topicPartitions = append(e.topicPartitions, ofrtp)
+	return e
+}
+
+func (b *MockBroker) ExpectOffsetFetchRequest() *OffsetFetchRequestExpectation {
+	e := &OffsetFetchRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *OffsetFetchRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]offsetFetchRequestTP)
+	for _, ofrtp := range e.topicPartitions {
+		byTopic[ofrtp.topic] = append(byTopic[ofrtp.topic], ofrtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic))
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // error
+			binary.Write(buf, binary.BigEndian, uint32(1))
+			binary.Write(buf, binary.BigEndian, uint64(tp.offset)) // offset
+		}
+	}
+
+	/*
+		sample response:
+
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x01, // number of offsets
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
+	*/
+	return buf.Bytes()
+}

+ 27 - 0
mockbroker/offset_fetch_request_expectation_test.go

@@ -0,0 +1,27 @@
+package mockbroker
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestOffsetFetchRequestSerialization(t *testing.T) {
+
+	exp := new(OffsetFetchRequestExpectation).
+		AddTopicPartition("my_topic", 0, 0x010101)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x01, // number of offsets
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}