Selaa lähdekoodia

WIP Moving to encoder-based expecations

Burke Libbey 12 vuotta sitten
vanhempi
commit
7a9ae540f0

+ 25 - 0
broker.go

@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"strconv"
+	"strings"
 	"sync"
 )
 
@@ -273,6 +275,29 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (b *Broker) encode(pe packetEncoder) (err error) {
+
+	parts := strings.Split(b.addr, ":")
+	if len(parts) != 2 {
+		return fmt.Errorf("invalid addr")
+	}
+	port, err := strconv.Atoi(parts[1])
+	if err != nil {
+		return err
+	}
+
+	pe.putInt32(b.id)
+
+	err = pe.putString(parts[0])
+	if err != nil {
+		return err
+	}
+
+	pe.putInt32(int32(port))
+
+	return nil
+}
+
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	for response := range b.responses {

+ 15 - 10
client_test.go

@@ -8,7 +8,7 @@ func TestSimpleClient(t *testing.T) {
 
 	mb := NewMockBroker(t, 1)
 
-	mb.ExpectMetadataRequest()
+	mb.Returns(new(MetadataResponse))
 
 	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
 	if err != nil {
@@ -23,8 +23,9 @@ func TestClientExtraBrokers(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -40,9 +41,10 @@ func TestClientMetadata(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb5 := NewMockBroker(t, 5)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb5).
-		AddTopicPartition("my_topic", 0, mb5.BrokerID())
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb5.Addr(), int32(mb5.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, int32(mb5.BrokerID()))
+	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -78,11 +80,14 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb5 := NewMockBroker(t, 5)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb5)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb5.Addr(), int32(mb5.BrokerID()))
+	mb1.Returns(mdr)
 
-	mb5.ExpectMetadataRequest().
-		AddTopicPartition("my_topic", 0xb, 5)
+	mdr2 := new(MetadataResponse)
+	mdr2.AddBroker(mb5.Addr(), int32(mb5.BrokerID()))
+	mdr2.AddTopicPartition("my_topic", 0xb, int32(mb5.BrokerID()))
+	mb5.Returns(mdr2)
 
 	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1})
 	if err != nil {

+ 18 - 13
consumer_test.go

@@ -38,13 +38,15 @@ func TestSimpleConsumer(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 0, 2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb2.Returns(mdr)
 
 	for i := 0; i < 10; i++ {
-		mb2.ExpectFetchRequest().
-			AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), uint64(i))
+		fr := new(FetchResponse)
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), uint64(i))
+		mb2.Returns(fr)
 	}
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -78,9 +80,10 @@ func TestConsumerRawOffset(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 0, 2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb2.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -107,12 +110,14 @@ func TestConsumerLatestOffset(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 0, 2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb2.Returns(mdr)
 
-	mb2.ExpectOffsetFetchRequest().
-		AddTopicPartition("my_topic", 0, 0x010101)
+	ofr := new(OffsetFetchResponse)
+	ofr.AddTopicPartition("my_topic", 0, 0x010101)
+	mb2.Returns(ofr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {

+ 0 - 136
fetch_request_expectation.go

@@ -1,136 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"encoding/binary"
-	"hash/crc32"
-)
-
-type FetchRequestExpectation struct {
-	messages []fetchResponseMessage
-}
-
-// this is why single-namespace projects are literally retarded.
-type encoder2 interface {
-	Encode() ([]byte, error)
-}
-
-type fetchResponseMessage struct {
-	topic      string
-	partition  int32
-	key, value encoder2
-	offset     uint64
-}
-
-func (e *FetchRequestExpectation) AddMessage(
-	topic string, partition int32, key, value encoder2, offset uint64,
-) *FetchRequestExpectation {
-	e.messages = append(e.messages, fetchResponseMessage{
-		topic:     topic,
-		partition: partition,
-		key:       key,
-		value:     value,
-		offset:    offset,
-	})
-	return e
-}
-
-func (b *MockBroker) ExpectFetchRequest() *FetchRequestExpectation {
-	e := &FetchRequestExpectation{}
-	b.expectations <- e
-	return e
-}
-
-func (e *FetchRequestExpectation) ResponseBytes() []byte {
-	buf := new(bytes.Buffer)
-
-	byTopic := make(map[string][]fetchResponseMessage)
-	for _, frm := range e.messages {
-		byTopic[frm.topic] = append(byTopic[frm.topic], frm)
-	}
-
-	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
-	for topic, messages := range byTopic {
-		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
-		buf.Write([]byte(topic))
-
-		byPartition := make(map[int32][]fetchResponseMessage)
-		for _, frm := range messages {
-			byPartition[frm.partition] = append(byPartition[frm.partition], frm)
-		}
-
-		binary.Write(buf, binary.BigEndian, uint32(len(byPartition)))
-
-		for partition, messages := range byPartition {
-			binary.Write(buf, binary.BigEndian, uint32(partition))
-			binary.Write(buf, binary.BigEndian, uint16(0)) // error
-			binary.Write(buf, binary.BigEndian, uint64(0)) // high water mark offset
-
-			messageSetBuffer := new(bytes.Buffer)
-
-			var maxOffset uint64
-
-			for _, msg := range messages {
-				chunk := new(bytes.Buffer)
-
-				binary.Write(chunk, binary.BigEndian, uint8(0)) // format
-				binary.Write(chunk, binary.BigEndian, uint8(0)) // attribute
-
-				if msg.offset > maxOffset {
-					maxOffset = msg.offset
-				}
-
-				if msg.key == nil {
-					binary.Write(chunk, binary.BigEndian, int32(-1))
-				} else {
-					bytes, _ := msg.key.Encode()
-					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
-					chunk.Write(bytes)
-				}
-
-				if msg.value == nil {
-					binary.Write(chunk, binary.BigEndian, int32(-1))
-				} else {
-					bytes, _ := msg.value.Encode()
-					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
-					chunk.Write(bytes)
-				}
-
-				cksum := crc32.ChecksumIEEE(chunk.Bytes())
-				length := len(chunk.Bytes()) + 4
-
-				binary.Write(messageSetBuffer, binary.BigEndian, uint32(length)) // message length
-				binary.Write(messageSetBuffer, binary.BigEndian, uint32(cksum))  // CRC
-				messageSetBuffer.Write(chunk.Bytes())
-			}
-
-			binary.Write(buf, binary.BigEndian, uint32(len(messageSetBuffer.Bytes())+8)) // msgSet size
-			binary.Write(buf, binary.BigEndian, uint64(maxOffset))                       // offset
-			buf.Write(messageSetBuffer.Bytes())
-
-		}
-
-	}
-
-	/*
-		sample response:
-
-		0x00, 0x00, 0x00, 0x01, // number of topics
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
-		0x00, 0x00, 0x00, 0x00, // partition id
-		0x00, 0x00, // error
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
-		// messageSet
-		0x00, 0x00, 0x00, 0x1C, // messageset size
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
-		// message
-		0x00, 0x00, 0x00, 0x10, // length of message (?)
-		0x23, 0x96, 0x4a, 0xf7, // CRC32
-		0x00, // format
-		0x00, // attribute (compression)
-		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
-		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
-	*/
-	return buf.Bytes()
-}

+ 0 - 36
fetch_request_expectation_test.go

@@ -1,36 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"testing"
-)
-
-func TestFetchRequestSerialization(t *testing.T) {
-
-	exp := new(FetchRequestExpectation).
-		AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0xEE}), 3)
-
-	expected := []byte{
-		0x00, 0x00, 0x00, 0x01, // number of topics
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
-		0x00, 0x00, 0x00, 0x00, // partition id
-		0x00, 0x00, // error
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
-		// messageSet
-		0x00, 0x00, 0x00, 0x1C, // messageset size
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // offset
-		// message
-		0x00, 0x00, 0x00, 0x10, // length of message (?)
-		0x23, 0x96, 0x4a, 0xf7, // CRC32
-		0x00,                   // format
-		0x00,                   // attribute (compression)
-		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
-		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
-	}
-
-	actual := exp.ResponseBytes()
-	if bytes.Compare(actual, expected) != 0 {
-		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
-	}
-}

+ 39 - 0
fetch_response.go

@@ -36,6 +36,16 @@ type FetchResponse struct {
 	Blocks map[string]map[int32]*FetchResponseBlock
 }
 
+func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(pr.Err))
+
+	pe.putInt64(pr.HighWaterMarkOffset)
+
+	// TODO: Encode message set
+
+	return nil
+}
+
 func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
@@ -74,6 +84,35 @@ func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
+	err = pe.putArrayLength(len(fr.Blocks))
+	if err != nil {
+		return err
+	}
+
+	for topic, partitions := range fr.Blocks {
+		err = pe.putString(topic)
+		if err != nil {
+			return err
+		}
+
+		err = pe.putArrayLength(len(partitions))
+		if err != nil {
+			return err
+		}
+
+		for id, block := range partitions {
+			pe.putInt32(id)
+			err = block.encode(pe)
+			if err != nil {
+				return err
+			}
+		}
+
+	}
+	return nil
+}
+
 func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
 	if fr.Blocks == nil {
 		return nil

+ 0 - 123
metadata_request_expectation.go

@@ -1,123 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"encoding/binary"
-)
-
-type MetadataRequestExpectation struct {
-	err             error
-	topicPartitions []metadataRequestTP
-	brokers         []mockbrokerish
-}
-
-type metadataRequestTP struct {
-	topic     string
-	partition int32
-	brokerId  int
-}
-
-type mockbrokerish interface {
-	BrokerID() int
-	Port() int32
-}
-
-func (e *MetadataRequestExpectation) AddBroker(b mockbrokerish) *MetadataRequestExpectation {
-	e.brokers = append(e.brokers, b)
-	return e
-}
-
-func (e *MetadataRequestExpectation) AddTopicPartition(
-	topic string, partition int32, brokerId int,
-) *MetadataRequestExpectation {
-	mrtp := metadataRequestTP{topic, partition, brokerId}
-	e.topicPartitions = append(e.topicPartitions, mrtp)
-	return e
-}
-
-func (e *MetadataRequestExpectation) Error(err error) *MetadataRequestExpectation {
-	e.err = err
-	return e
-}
-
-func (e *MetadataRequestExpectation) ResponseBytes() []byte {
-	buf := new(bytes.Buffer)
-
-	binary.Write(buf, binary.BigEndian, uint32(len(e.brokers)))
-	for _, broker := range e.brokers {
-		binary.Write(buf, binary.BigEndian, uint32(broker.BrokerID()))
-		buf.Write([]byte{0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't'})
-		binary.Write(buf, binary.BigEndian, uint32(broker.Port()))
-	}
-
-	byTopic := make(map[string][]metadataRequestTP)
-	for _, mrtp := range e.topicPartitions {
-		byTopic[mrtp.topic] = append(byTopic[mrtp.topic], mrtp)
-	}
-
-	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
-	for topic, tps := range byTopic {
-		// we don't support mocking errors at topic-level; only partition-level
-		binary.Write(buf, binary.BigEndian, uint16(0))
-		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
-		buf.Write([]byte(topic))
-		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
-		for _, tp := range tps {
-			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: Write the error code instead
-			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
-			binary.Write(buf, binary.BigEndian, uint32(tp.brokerId))
-			binary.Write(buf, binary.BigEndian, uint32(0)) // replica set
-			binary.Write(buf, binary.BigEndian, uint32(0)) // ISR set
-		}
-	}
-
-	// Sample response:
-	/*
-		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
-
-		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
-
-		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
-
-		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-	*/
-
-	return buf.Bytes()
-}
-
-func (b *MockBroker) ExpectMetadataRequest() *MetadataRequestExpectation {
-	e := &MetadataRequestExpectation{}
-	b.expectations <- e
-	return e
-}

+ 0 - 70
metadata_request_expectation_test.go

@@ -1,70 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"testing"
-)
-
-type mockMockBroker struct {
-	id   int
-	port int32
-}
-
-func (b mockMockBroker) BrokerID() int { return b.id }
-func (b mockMockBroker) Port() int32   { return b.port }
-
-func TestMetadataRequestSerialization(t *testing.T) {
-
-	exp := new(MetadataRequestExpectation).
-		AddBroker(mockMockBroker{1, 8080}).
-		AddBroker(mockMockBroker{2, 8081}).
-		AddTopicPartition("topic_a", 0, 1).
-		AddTopicPartition("topic_b", 0, 2).
-		AddTopicPartition("topic_c", 0, 2)
-
-	expected := []byte{
-		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
-
-		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
-		0x00, 0x00, 0x1F, 0x90, // 19:22 port
-
-		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
-		0x00, 0x00, 0x1F, 0x91, // 38:41 port
-
-		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-	}
-
-	actual := exp.ResponseBytes()
-	if bytes.Compare(actual, expected) != 0 {
-		t.Errorf("\nExpected\n% 2x\nbut got\n% 2x", expected, actual)
-	}
-}

+ 107 - 0
metadata_response.go

@@ -38,6 +38,24 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(pm.Err))
+	pe.putInt32(pm.ID)
+	pe.putInt32(pm.Leader)
+
+	err = pe.putInt32Array(pm.Replicas)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putInt32Array(pm.Isr)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 type TopicMetadata struct {
 	Err        KError
 	Name       string
@@ -72,6 +90,29 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(tm.Err))
+
+	err = pe.putString(tm.Name)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putArrayLength(len(tm.Partitions))
+	if err != nil {
+		return err
+	}
+
+	for _, pm := range tm.Partitions {
+		err = pm.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type MetadataResponse struct {
 	Brokers []*Broker
 	Topics  []*TopicMetadata
@@ -108,3 +149,69 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (m *MetadataResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(m.Brokers))
+	if err != nil {
+		return err
+	}
+	for _, broker := range m.Brokers {
+		err = broker.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+
+	err = pe.putArrayLength(len(m.Topics))
+	if err != nil {
+		return err
+	}
+	for _, tm := range m.Topics {
+		err = tm.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// testing API
+
+func (m *MetadataResponse) AddBroker(addr string, id int32) {
+	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
+}
+
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32) {
+	var match *TopicMetadata
+
+	for _, tm := range m.Topics {
+		if tm.Name == topic {
+			match = tm
+			goto foundTopic
+		}
+	}
+
+	match = new(TopicMetadata)
+	m.Topics = append(m.Topics, match)
+
+foundTopic:
+
+	var pmatch *PartitionMetadata
+
+	for _, pm := range match.Partitions {
+		if pm.ID == partition {
+			pmatch = pm
+			goto foundPartition
+		}
+	}
+
+	pmatch = new(PartitionMetadata)
+	match.Partitions = append(match.Partitions, pmatch)
+
+foundPartition:
+
+	pmatch.Leader = brokerID
+
+}

+ 4 - 0
mockbroker.go

@@ -154,3 +154,7 @@ func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
 
 	return broker
 }
+
+func (b *MockBroker) Returns(e encoder) {
+
+}

+ 0 - 65
offset_fetch_request_expectation.go

@@ -1,65 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"encoding/binary"
-)
-
-type OffsetFetchRequestExpectation struct {
-	topicPartitions []offsetFetchRequestTP
-}
-
-type offsetFetchRequestTP struct {
-	topic     string
-	partition int32
-	offset    uint64
-}
-
-func (e *OffsetFetchRequestExpectation) AddTopicPartition(
-	topic string, partition int32, offset uint64,
-) *OffsetFetchRequestExpectation {
-	ofrtp := offsetFetchRequestTP{topic, partition, offset}
-	e.topicPartitions = append(e.topicPartitions, ofrtp)
-	return e
-}
-
-func (b *MockBroker) ExpectOffsetFetchRequest() *OffsetFetchRequestExpectation {
-	e := &OffsetFetchRequestExpectation{}
-	b.expectations <- e
-	return e
-}
-
-func (e *OffsetFetchRequestExpectation) ResponseBytes() []byte {
-	buf := new(bytes.Buffer)
-
-	byTopic := make(map[string][]offsetFetchRequestTP)
-	for _, ofrtp := range e.topicPartitions {
-		byTopic[ofrtp.topic] = append(byTopic[ofrtp.topic], ofrtp)
-	}
-
-	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
-	for topic, tps := range byTopic {
-		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
-		buf.Write([]byte(topic))
-		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
-		for _, tp := range tps {
-			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
-			binary.Write(buf, binary.BigEndian, uint16(0)) // error
-			binary.Write(buf, binary.BigEndian, uint32(1))
-			binary.Write(buf, binary.BigEndian, uint64(tp.offset)) // offset
-		}
-	}
-
-	/*
-		sample response:
-
-		0x00, 0x00, 0x00, 0x01, // number of topics
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
-		0x00, 0x00, 0x00, 0x00, // partition id
-		0x00, 0x00, // error
-		0x00, 0x00, 0x00, 0x01, // number of offsets
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
-	*/
-	return buf.Bytes()
-}

+ 0 - 27
offset_fetch_request_expectation_test.go

@@ -1,27 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"testing"
-)
-
-func TestOffsetFetchRequestSerialization(t *testing.T) {
-
-	exp := new(OffsetFetchRequestExpectation).
-		AddTopicPartition("my_topic", 0, 0x010101)
-
-	expected := []byte{
-		0x00, 0x00, 0x00, 0x01, // number of topics
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
-		0x00, 0x00, 0x00, 0x00, // partition id
-		0x00, 0x00, // error
-		0x00, 0x00, 0x00, 0x01, // number of offsets
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
-	}
-
-	actual := exp.ResponseBytes()
-	if bytes.Compare(actual, expected) != 0 {
-		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
-	}
-}

+ 58 - 0
offset_fetch_response.go

@@ -26,6 +26,19 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (r *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt64(r.Offset)
+
+	err = pe.putString(r.Metadata)
+	if err != nil {
+		return err
+	}
+
+	pe.putInt16(int16(r.Err))
+
+	return nil
+}
+
 type OffsetFetchResponse struct {
 	ClientID string
 	Blocks   map[string]map[int32]*OffsetFetchResponseBlock
@@ -73,3 +86,48 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
+	err = pe.putString(r.ClientID)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putArrayLength(len(r.Blocks))
+	if err != nil {
+		return err
+	}
+
+	for topic, partitions := range r.Blocks {
+		err = pe.putString(topic)
+		if err != nil {
+			return err
+		}
+
+		err = pe.putArrayLength(len(partitions))
+		if err != nil {
+			return err
+		}
+
+		for id, block := range partitions {
+			pe.putInt32(id)
+			err = block.encode(pe)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+// testing API
+
+func (r *OffsetFetchResponse) AddTopicPartition(topic string, partition int32, offset int64) {
+	byTopic, ok := r.Blocks[topic]
+	if !ok {
+		byTopic = make(map[int32]*OffsetFetchResponseBlock)
+		r.Blocks[topic] = byTopic
+	}
+	byTopic[partition] = &OffsetFetchResponseBlock{Offset: offset}
+}

+ 0 - 77
produce_request_expectation.go

@@ -1,77 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"encoding/binary"
-)
-
-type ProduceRequestExpectation struct {
-	err             error
-	topicPartitions []produceRequestTP
-}
-
-type produceRequestTP struct {
-	topic     string
-	partition int32
-	nMessages int
-	err       KError
-}
-
-func (e *ProduceRequestExpectation) AddTopicPartition(
-	topic string, partition int32, nMessages int, err KError,
-) *ProduceRequestExpectation {
-	prtp := produceRequestTP{topic, partition, nMessages, err}
-	e.topicPartitions = append(e.topicPartitions, prtp)
-	return e
-}
-
-func (b *MockBroker) ExpectProduceRequest() *ProduceRequestExpectation {
-	e := &ProduceRequestExpectation{}
-	b.expectations <- e
-	return e
-}
-
-func (e *ProduceRequestExpectation) ResponseBytes() []byte {
-	buf := new(bytes.Buffer)
-
-	byTopic := make(map[string][]produceRequestTP)
-	for _, prtp := range e.topicPartitions {
-		byTopic[prtp.topic] = append(byTopic[prtp.topic], prtp)
-	}
-
-	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
-	for topic, tps := range byTopic {
-		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
-		buf.Write([]byte(topic)) // TODO: Does this write a null?
-		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
-		for _, tp := range tps {
-			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
-			binary.Write(buf, binary.BigEndian, uint16(tp.err)) // TODO: error
-			binary.Write(buf, binary.BigEndian, uint64(0))      // offset
-		}
-	}
-
-	/*
-	   sample response:
-
-	   0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
-
-	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-	   0x00, 0x00, // 21:22 error: 0 means no error
-	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-
-	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
-	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-	   0x00, 0x00, // 21:22 error: 0 means no error
-	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-	*/
-	return buf.Bytes()
-}
-
-func (e *ProduceRequestExpectation) Error(err error) *ProduceRequestExpectation {
-	e.err = err
-	return e
-}

+ 0 - 34
produce_request_expectation_test.go

@@ -1,34 +0,0 @@
-package sarama
-
-import (
-	"bytes"
-	"testing"
-)
-
-func TestProduceRequestSerialization(t *testing.T) {
-
-	exp := new(ProduceRequestExpectation).
-		AddTopicPartition("topic_b", 0, 1, NoError).
-		AddTopicPartition("topic_c", 0, 1, NoError)
-
-	expected := []byte{
-		0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
-
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-		0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-		0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-		0x00, 0x00, // 21:22 error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
-
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
-		0x00, 0x00, 0x00, 0x00, // partition id
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
-	}
-
-	actual := exp.ResponseBytes()
-	if bytes.Compare(actual, expected) != 0 {
-		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
-	}
-}

+ 34 - 0
produce_response.go

@@ -62,6 +62,29 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (pr *ProduceResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(pr.Blocks))
+	if err != nil {
+		return err
+	}
+	for topic, partitions := range pr.Blocks {
+		err = pe.putString(topic)
+		if err != nil {
+			return err
+		}
+		err = pe.putArrayLength(len(partitions))
+		if err != nil {
+			return err
+		}
+		for id, prb := range partitions {
+			pe.putInt32(id)
+			pe.putInt16(int16(prb.Err))
+			pe.putInt64(prb.Offset)
+		}
+	}
+	return nil
+}
+
 func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if pr.Blocks == nil {
 		return nil
@@ -73,3 +96,14 @@ func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceRespo
 
 	return pr.Blocks[topic][partition]
 }
+
+// Testing API
+
+func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
+	byTopic, ok := pr.Blocks[topic]
+	if !ok {
+		byTopic = make(map[int32]*ProduceResponseBlock)
+		pr.Blocks[topic] = byTopic
+	}
+	byTopic[partition] = &ProduceResponseBlock{Err: err}
+}

+ 78 - 48
producer_test.go

@@ -15,12 +15,14 @@ func TestSimpleProducer(t *testing.T) {
 	defer mb1.Close()
 	defer mb2.Close()
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 0, 2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
 
-	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, NoError)
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	mb2.Returns(pr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -49,13 +51,16 @@ func TestSimpleSyncProducer(t *testing.T) {
 	defer mb1.Close()
 	defer mb2.Close()
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 1, 2)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 1, 2)
+	mb1.Returns(mdr)
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 1, NoError)
 
 	for i := 0; i < 10; i++ {
-		mb2.ExpectProduceRequest().
-			AddTopicPartition("my_topic", 1, 10, NoError)
+		mb2.Returns(pr)
 	}
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -83,17 +88,16 @@ func TestMultipleFlushes(t *testing.T) {
 	defer mb1.Close()
 	defer mb2.Close()
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddTopicPartition("my_topic", 0, 2)
-
-	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, NoError).
-		AddTopicPartition("my_topic", 0, 1, NoError)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
 
-	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 0, 1, NoError).
-		AddTopicPartition("my_topic", 0, 1, NoError)
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	mb2.Returns(pr)
+	mb2.Returns(pr) // yes, twice.
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -123,19 +127,22 @@ func TestMultipleProducer(t *testing.T) {
 	defer mb2.Close()
 	defer mb3.Close()
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddBroker(mb3).
-		AddTopicPartition("topic_a", 0, 2).
-		AddTopicPartition("topic_b", 0, 3).
-		AddTopicPartition("topic_c", 0, 3)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr.AddTopicPartition("topic_a", 0, 2)
+	mdr.AddTopicPartition("topic_b", 0, 3)
+	mdr.AddTopicPartition("topic_c", 0, 3)
+	mb1.Returns(mdr)
 
-	mb2.ExpectProduceRequest().
-		AddTopicPartition("topic_a", 0, 1, NoError)
+	pr1 := new(ProduceResponse)
+	pr1.AddTopicPartition("topic_a", 0, NoError)
+	mb2.Returns(pr1)
 
-	mb3.ExpectProduceRequest().
-		AddTopicPartition("topic_b", 0, 1, NoError).
-		AddTopicPartition("topic_c", 0, 1, NoError)
+	pr2 := new(ProduceResponse)
+	pr2.AddTopicPartition("topic_b", 0, NoError)
+	pr2.AddTopicPartition("topic_c", 0, NoError)
+	mb3.Returns(pr2)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -178,28 +185,51 @@ func TestFailureRetry(t *testing.T) {
 	mb2 := NewMockBroker(t, 2)
 	mb3 := NewMockBroker(t, 3)
 
-	mb1.ExpectMetadataRequest().
-		AddBroker(mb2).
-		AddBroker(mb3).
-		AddTopicPartition("topic_a", 0, 2).
-		AddTopicPartition("topic_b", 0, 2).
-		AddTopicPartition("topic_c", 0, 3)
-
-	mb2.ExpectProduceRequest().
-		AddTopicPartition("topic_a", 0, 1, NoError).
-		AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr.AddTopicPartition("topic_a", 0, 2)
+	mdr.AddTopicPartition("topic_b", 0, 3)
+	mdr.AddTopicPartition("topic_c", 0, 3)
+	mb1.Returns(mdr)
+
+	/* mb1.ExpectMetadataRequest(). */
+	/* 	AddBroker(mb2). */
+	/* 	AddBroker(mb3). */
+	/* 	AddTopicPartition("topic_a", 0, 2). */
+	/* 	AddTopicPartition("topic_b", 0, 2). */
+	/* 	AddTopicPartition("topic_c", 0, 3) */
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("topic_a", 0, NoError)
+	pr.AddTopicPartition("topic_a", 0, NotLeaderForPartition)
+	mb2.Returns(pr)
+
+	/* mb2.ExpectProduceRequest(). */
+	/* 	AddTopicPartition("topic_a", 0, 1, NoError). */
+	/* 	AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition) */
 
 	// The fact that mb2 is chosen here is not well-defined. In theory,
 	// it's a random choice between mb1, mb2, and mb3. Go's hash iteration
 	// isn't quite as random as claimed, though, it seems. Maybe because
 	// the same random seed is used each time?
-	mb2.ExpectMetadataRequest().
-		AddBroker(mb3).
-		AddTopicPartition("topic_b", 0, 3)
-
-	mb3.ExpectProduceRequest().
-		AddTopicPartition("topic_c", 0, 1, NoError).
-		AddTopicPartition("topic_b", 0, 1, NoError)
+	mdr2 := new(MetadataResponse)
+	mdr2.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr2.AddTopicPartition("topic_b", 0, 3)
+	mb2.Returns(mdr2)
+
+	/* mb2.ExpectMetadataRequest(). */
+	/* 	AddBroker(mb3). */
+	/* 	AddTopicPartition("topic_b", 0, 3) */
+
+	pr2 := new(ProduceResponse)
+	pr2.AddTopicPartition("topic_c", 0, NoError)
+	pr2.AddTopicPartition("topic_b", 0, NoError)
+	mb3.Returns(pr2)
+
+	/* mb3.ExpectProduceRequest(). */
+	/* 	AddTopicPartition("topic_c", 0, 1, NoError). */
+	/* 	AddTopicPartition("topic_b", 0, 1, NoError) */
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {