Browse Source

Beefier multibroker for cleaner testing

Burke Libbey 12 years ago
parent
commit
cb0815d794

+ 123 - 0
mockbroker/metadata_request_expectation.go

@@ -0,0 +1,123 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type MetadataRequestExpectation struct {
+	err             error
+	topicPartitions []metadataRequestTP
+	brokers         []mockbrokerish
+}
+
+type metadataRequestTP struct {
+	topic     string
+	partition int32
+	brokerId  int
+}
+
+type mockbrokerish interface {
+	BrokerID() int
+	Port() int32
+}
+
+func (e *MetadataRequestExpectation) AddBroker(b mockbrokerish) *MetadataRequestExpectation {
+	e.brokers = append(e.brokers, b)
+	return e
+}
+
+func (e *MetadataRequestExpectation) AddTopicPartition(
+	topic string, partition int32, brokerId int,
+) *MetadataRequestExpectation {
+	mrtp := metadataRequestTP{topic, partition, brokerId}
+	e.topicPartitions = append(e.topicPartitions, mrtp)
+	return e
+}
+
+func (e *MetadataRequestExpectation) Error(err error) *MetadataRequestExpectation {
+	e.err = err
+	return e
+}
+
+func (e *MetadataRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	binary.Write(buf, binary.BigEndian, uint32(len(e.brokers)))
+	for _, broker := range e.brokers {
+		binary.Write(buf, binary.BigEndian, uint32(broker.BrokerID()))
+		buf.Write([]byte{0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't'})
+		binary.Write(buf, binary.BigEndian, uint32(broker.Port()))
+	}
+
+	byTopic := make(map[string][]metadataRequestTP)
+	for _, mrtp := range e.topicPartitions {
+		byTopic[mrtp.topic] = append(byTopic[mrtp.topic], mrtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		// we don't support mocking errors at topic-level; only partition-level
+		binary.Write(buf, binary.BigEndian, uint16(0))
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic))
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: Write the error code instead
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint32(tp.brokerId))
+			binary.Write(buf, binary.BigEndian, uint32(0)) // replica set
+			binary.Write(buf, binary.BigEndian, uint32(0)) // ISR set
+		}
+	}
+
+	// Sample response:
+	/*
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
+
+		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
+
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+	*/
+
+	return buf.Bytes()
+}
+
+func (b *MockBroker) ExpectMetadataRequest() *MetadataRequestExpectation {
+	e := &MetadataRequestExpectation{}
+	b.expectations <- e
+	return e
+}

+ 70 - 0
mockbroker/metadata_request_expectation_test.go

@@ -0,0 +1,70 @@
+package mockbroker
+
+import (
+	"bytes"
+	"testing"
+)
+
+type mockMockBroker struct {
+	id   int
+	port int32
+}
+
+func (b mockMockBroker) BrokerID() int { return b.id }
+func (b mockMockBroker) Port() int32   { return b.port }
+
+func TestMetadataRequestSerialization(t *testing.T) {
+
+	exp := new(MetadataRequestExpectation).
+		AddBroker(mockMockBroker{1, 8080}).
+		AddBroker(mockMockBroker{2, 8081}).
+		AddTopicPartition("topic_a", 0, 1).
+		AddTopicPartition("topic_b", 0, 2).
+		AddTopicPartition("topic_c", 0, 2)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0x00, 0x00, 0x1F, 0x90, // 19:22 port
+
+		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
+		0x00, 0x00, 0x1F, 0x91, // 38:41 port
+
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Errorf("\nExpected\n% 2x\nbut got\n% 2x", expected, actual)
+	}
+}

+ 142 - 0
mockbroker/mockbroker.go

@@ -0,0 +1,142 @@
+package mockbroker
+
+import (
+	"encoding/binary"
+	"errors"
+	"io"
+	"net"
+	"strconv"
+	"testing"
+)
+
+// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+// accepts a single connection. It reads Kafka requests from that connection and returns each response
+// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
+// the server sleeps for 250ms instead of reading a request).
+//
+// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
+// waiting for a response, the test panics.
+//
+// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
+// automatically as a convenience.
+type MockBroker struct {
+	brokerID     int
+	port         int32
+	stopper      chan bool
+	expectations chan Expectation
+	listener     net.Listener
+	t            *testing.T
+	expecting    bool
+}
+
+func (b *MockBroker) BrokerID() int {
+	return b.brokerID
+}
+
+type Expectation interface {
+	ResponseBytes() []byte
+}
+
+func (b *MockBroker) Port() int32 {
+	return b.port
+}
+
+func (b *MockBroker) Addr() string {
+	return b.listener.Addr().String()
+}
+
+func (b *MockBroker) Close() {
+	if b.expecting {
+		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d!", b.BrokerID())
+	}
+	close(b.expectations)
+	<-b.stopper
+}
+
+func (b *MockBroker) serverLoop() (ok bool) {
+	var (
+		err  error
+		conn net.Conn
+	)
+
+	defer close(b.stopper)
+	if conn, err = b.listener.Accept(); err != nil {
+		return b.serverError(err, conn)
+	}
+	reqHeader := make([]byte, 4)
+	resHeader := make([]byte, 8)
+	for expectation := range b.expectations {
+		b.expecting = true
+		_, err = io.ReadFull(conn, reqHeader)
+		b.expecting = false
+		if err != nil {
+			return b.serverError(err, conn)
+		}
+		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
+		if len(body) < 10 {
+			return b.serverError(errors.New("Kafka request too short."), conn)
+		}
+		if _, err = io.ReadFull(conn, body); err != nil {
+			return b.serverError(err, conn)
+		}
+		response := expectation.ResponseBytes()
+
+		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+		if _, err = conn.Write(resHeader); err != nil {
+			return b.serverError(err, conn)
+		}
+		if _, err = conn.Write(response); err != nil {
+			return b.serverError(err, conn)
+		}
+	}
+	if err = conn.Close(); err != nil {
+		return b.serverError(err, nil)
+	}
+	if err = b.listener.Close(); err != nil {
+		b.t.Error(err)
+		return false
+	}
+	return true
+}
+
+func (b *MockBroker) serverError(err error, conn net.Conn) bool {
+	b.t.Error(err)
+	if conn != nil {
+		conn.Close()
+	}
+	b.listener.Close()
+	return false
+}
+
+// New launches a fake Kafka broker. It takes a testing.T as provided by the
+// test framework and a channel of responses to use.  If an error occurs it is
+// simply logged to the testing.T and the broker exits.
+func New(t *testing.T, brokerID int) *MockBroker {
+	var err error
+
+	broker := &MockBroker{
+		stopper:      make(chan bool),
+		t:            t,
+		brokerID:     brokerID,
+		expectations: make(chan Expectation, 512),
+	}
+
+	broker.listener, err = net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
+	if err != nil {
+		t.Fatal(err)
+	}
+	tmp, err := strconv.ParseInt(portStr, 10, 32)
+	if err != nil {
+		t.Fatal(err)
+	}
+	broker.port = int32(tmp)
+
+	go broker.serverLoop()
+
+	return broker
+}

+ 77 - 0
mockbroker/produce_request_expectation.go

@@ -0,0 +1,77 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type ProduceRequestExpectation struct {
+	err             error
+	topicPartitions []produceRequestTP
+}
+
+type produceRequestTP struct {
+	topic     string
+	partition int32
+	nMessages int
+	err       error
+}
+
+func (e *ProduceRequestExpectation) AddTopicPartition(
+	topic string, partition int32, nMessages int, err error,
+) *ProduceRequestExpectation {
+	prtp := produceRequestTP{topic, partition, nMessages, err}
+	e.topicPartitions = append(e.topicPartitions, prtp)
+	return e
+}
+
+func (b *MockBroker) ExpectProduceRequest() *ProduceRequestExpectation {
+	e := &ProduceRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *ProduceRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]produceRequestTP)
+	for _, prtp := range e.topicPartitions {
+		byTopic[prtp.topic] = append(byTopic[prtp.topic], prtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic)) // TODO: Does this write a null?
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: error
+			binary.Write(buf, binary.BigEndian, uint64(0)) // offset
+		}
+	}
+
+	/*
+	   sample response:
+
+	   0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+	   0x00, 0x00, // 21:22 error: 0 means no error
+	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+
+	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
+	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+	   0x00, 0x00, // 21:22 error: 0 means no error
+	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+	*/
+	return buf.Bytes()
+}
+
+func (e *ProduceRequestExpectation) Error(err error) *ProduceRequestExpectation {
+	e.err = err
+	return e
+}

+ 34 - 0
mockbroker/produce_request_expectation_test.go

@@ -0,0 +1,34 @@
+package mockbroker
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestProduceRequestSerialization(t *testing.T) {
+
+	exp := new(ProduceRequestExpectation).
+		AddTopicPartition("topic_b", 0, 1, nil).
+		AddTopicPartition("topic_c", 0, 1, nil)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+		0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+		0x00, 0x00, // 21:22 error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
+
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}

+ 18 - 38
producer_test.go

@@ -1,51 +1,31 @@
 package sarama
 package sarama
 
 
 import (
 import (
-	"encoding/binary"
 	"fmt"
 	"fmt"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 	"testing"
 )
 )
 
 
 func TestSimpleProducer(t *testing.T) {
 func TestSimpleProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
 
 
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-			binary.BigEndian.PutUint64(msg[23:], uint64(i))
-			extraResponses <- msg
-		}
-	}()
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	// TODO: While the third parameter is the number of messages to expect,
+	// really nothing about the message is actually asserted by the mock. This is
+	// a problem for the future.
+	for i := 0; i < 10; i++ {
+		mb2.ExpectProduceRequest().
+			AddTopicPartition("my_topic", 0, 1, nil)
+	}
 
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}