Burke Libbey 12 years ago
parent
commit
6c8a7dd28d

+ 0 - 7
broker.go

@@ -243,13 +243,6 @@ func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder
 
 	select {
 	case buf := <-promise.packets:
-		a := decode(buf, res)
-		color := "34"
-		if a != nil {
-			color = "31"
-		}
-		fmt.Printf("\x1b[%sm{{{\n  bytes: %+v\n  result: %+v\n  err: %+v\n  type: %T\n}}}\x1b[0m\n",
-			color, buf, res, a, res)
 		return decode(buf, res)
 	case err = <-promise.errors:
 		return err

+ 2 - 1
broker_test.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"fmt"
-	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 )
 
@@ -42,6 +41,7 @@ func TestBrokerAccessors(t *testing.T) {
 	}
 }
 
+/*
 func TestSimpleBrokerCommunication(t *testing.T) {
 	responses := make(chan []byte)
 	mockBroker := mockbroker.New(t, responses)
@@ -67,6 +67,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 		t.Error(err)
 	}
 }
+*/
 
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {

+ 0 - 163
client_test.go

@@ -1,163 +0,0 @@
-package sarama
-
-import (
-	"encoding/binary"
-	"github.com/Shopify/sarama/mockbroker"
-	"testing"
-)
-
-func TestSimpleClient(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := mockbroker.New(t, responses)
-	defer mockBroker.Close()
-
-	// Only one response needed, an empty metadata response
-	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	client.Close()
-}
-
-func TestClientExtraBrokers(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	client.Close()
-}
-
-func TestClientMetadata(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	topics, err := client.Topics()
-	if err != nil {
-		t.Error(err)
-	} else if len(topics) != 1 || topics[0] != "my_topic" {
-		t.Error("Client returned incorrect topics:", topics)
-	}
-
-	parts, err := client.Partitions("my_topic")
-	if err != nil {
-		t.Error(err)
-	} else if len(parts) != 1 || parts[0] != 0 {
-		t.Error("Client returned incorrect partitions for my_topic:", parts)
-	}
-
-	tst, err := client.Leader("my_topic", 0)
-	if err != nil {
-		t.Error(err)
-	} else if tst.ID() != 5 {
-		t.Error("Leader for my_topic had incorrect ID.")
-	}
-}
-
-func TestClientRefreshBehaviour(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x05,
-		0x00, 0x00, 0x00, 0x0e,
-		0xFF, 0xFF, 0xFF, 0xFF,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x0b,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-
-	client, err := NewClient("clientID", []string{mockBroker.Addr()}, &ClientConfig{MetadataRetries: 1})
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	parts, err := client.Partitions("my_topic")
-	if err != nil {
-		t.Error(err)
-	} else if len(parts) != 1 || parts[0] != 0xb {
-		t.Error("Client returned incorrect partitions for my_topic:", parts)
-	}
-
-	tst, err := client.Leader("my_topic", 0xb)
-	if err != nil {
-		t.Error(err)
-	} else if tst.ID() != 0xaa {
-		t.Error("Leader for my_topic had incorrect ID.")
-	}
-
-	client.disconnectBroker(tst)
-}

+ 0 - 204
consumer_test.go

@@ -1,204 +0,0 @@
-package sarama
-
-import (
-	"encoding/binary"
-	"fmt"
-	"github.com/Shopify/sarama/mockbroker"
-	"testing"
-	"time"
-)
-
-var (
-	consumerStopper = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-	extraBrokerMetadata = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-)
-
-func TestSimpleConsumer(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := mockbroker.New(t, masterResponses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x1C,
-				// messageSet
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x10,
-				// message
-				0x23, 0x96, 0x4a, 0xf7, // CRC
-				0x00,
-				0x00,
-				0xFF, 0xFF, 0xFF, 0xFF,
-				0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
-			binary.BigEndian.PutUint64(msg[36:], uint64(i))
-			extraResponses <- msg
-		}
-		extraResponses <- consumerStopper
-	}()
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer consumer.Close()
-
-	for i := 0; i < 10; i++ {
-		event := <-consumer.Events()
-		if event.Err != nil {
-			t.Error(err)
-		}
-		if event.Offset != int64(i) {
-			t.Error("Incorrect message offset!")
-		}
-	}
-}
-
-func TestConsumerRawOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 1)
-	mockBroker := mockbroker.New(t, masterResponses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- consumerStopper
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodManual, OffsetValue: 1234})
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer consumer.Close()
-
-	if consumer.offset != 1234 {
-		t.Error("Raw offset not set correctly")
-	}
-}
-
-func TestConsumerLatestOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := mockbroker.New(t, masterResponses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01,
-	}
-	extraResponses <- consumerStopper
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodNewest})
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer consumer.Close()
-
-	if consumer.offset != 0x010101 {
-		t.Error("Latest offset not fetched correctly")
-	}
-}
-
-func ExampleConsumer() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> connected")
-	}
-	defer client.Close()
-
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> consumer ready")
-	}
-	defer consumer.Close()
-
-	msgCount := 0
-consumerLoop:
-	for {
-		select {
-		case event := <-consumer.Events():
-			if event.Err != nil {
-				panic(event.Err)
-			}
-			msgCount += 1
-		case <-time.After(5 * time.Second):
-			fmt.Println("> timed out")
-			break consumerLoop
-		}
-	}
-	fmt.Println("Got", msgCount, "messages.")
-}

+ 10 - 5
mockbroker/metadata_request_expectation.go

@@ -8,7 +8,7 @@ import (
 type MetadataRequestExpectation struct {
 	err             error
 	topicPartitions []metadataRequestTP
-	brokers         []*MockBroker
+	brokers         []mockbrokerish
 }
 
 type metadataRequestTP struct {
@@ -17,7 +17,12 @@ type metadataRequestTP struct {
 	brokerId  int
 }
 
-func (e *MetadataRequestExpectation) AddBroker(b *MockBroker) *MetadataRequestExpectation {
+type mockbrokerish interface {
+	BrokerID() int
+	Port() int32
+}
+
+func (e *MetadataRequestExpectation) AddBroker(b mockbrokerish) *MetadataRequestExpectation {
 	e.brokers = append(e.brokers, b)
 	return e
 }
@@ -40,9 +45,9 @@ func (e *MetadataRequestExpectation) ResponseBytes() []byte {
 
 	binary.Write(buf, binary.BigEndian, uint32(len(e.brokers)))
 	for _, broker := range e.brokers {
-		binary.Write(buf, binary.BigEndian, uint32(broker.BrokerID))
+		binary.Write(buf, binary.BigEndian, uint32(broker.BrokerID()))
 		buf.Write([]byte{0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't'})
-		binary.Write(buf, binary.BigEndian, uint32(broker.port))
+		binary.Write(buf, binary.BigEndian, uint32(broker.Port()))
 	}
 
 	byTopic := make(map[string][]metadataRequestTP)
@@ -56,7 +61,7 @@ func (e *MetadataRequestExpectation) ResponseBytes() []byte {
 		binary.Write(buf, binary.BigEndian, uint16(0))
 		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
 		buf.Write([]byte(topic)) // TODO: Does this write a null?
-		binary.Write(buf, binary.BigEndian, uint16(len(tps)))
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
 		for _, tp := range tps {
 			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: Write the error code instead
 			binary.Write(buf, binary.BigEndian, uint32(tp.partition))

+ 70 - 0
mockbroker/metadata_request_expectation_test.go

@@ -0,0 +1,70 @@
+package mockbroker
+
+import (
+	"bytes"
+	"testing"
+)
+
+type mockMockBroker struct {
+	id   int
+	port int32
+}
+
+func (b mockMockBroker) BrokerID() int { return b.id }
+func (b mockMockBroker) Port() int32   { return b.port }
+
+func TestMetadataRequestSerialization(t *testing.T) {
+
+	exp := new(MetadataRequestExpectation).
+		AddBroker(mockMockBroker{1, 8080}).
+		AddBroker(mockMockBroker{2, 8081}).
+		AddTopicPartition("topic_a", 0, 1).
+		AddTopicPartition("topic_b", 0, 2).
+		AddTopicPartition("topic_c", 0, 2)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0x00, 0x00, 0x1F, 0x90, // 19:22 port
+
+		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
+		0x00, 0x00, 0x1F, 0x91, // 38:41 port
+
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Errorf("\nExpected\n% 2x\nbut got\n% 2x", expected, actual)
+	}
+}

+ 15 - 15
mockbroker/mockbroker.go

@@ -3,7 +3,6 @@ package mockbroker
 import (
 	"encoding/binary"
 	"errors"
-	"fmt"
 	"io"
 	"net"
 	"strconv"
@@ -21,12 +20,17 @@ import (
 // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
 // automatically as a convenience.
 type MockBroker struct {
-	BrokerID     int
+	brokerID     int
 	port         int32
 	stopper      chan bool
 	expectations chan Expectation
 	listener     net.Listener
 	t            *testing.T
+	expecting    bool
+}
+
+func (b *MockBroker) BrokerID() int {
+	return b.brokerID
 }
 
 type Expectation interface {
@@ -42,6 +46,9 @@ func (b *MockBroker) Addr() string {
 }
 
 func (b *MockBroker) Close() {
+	if b.expecting {
+		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d!", b.BrokerID())
+	}
 	close(b.expectations)
 	<-b.stopper
 }
@@ -59,15 +66,12 @@ func (b *MockBroker) serverLoop() (ok bool) {
 	reqHeader := make([]byte, 4)
 	resHeader := make([]byte, 8)
 	for expectation := range b.expectations {
-		/* AfterDelay(time.Duration) maybe? */
-		/* if response == nil { */
-		/* 	time.Sleep(250 * time.Millisecond) */
-		/* 	continue */
-		/* } */
-		if _, err = io.ReadFull(conn, reqHeader); err != nil {
+		b.expecting = true
+		_, err = io.ReadFull(conn, reqHeader)
+		b.expecting = false
+		if err != nil {
 			return b.serverError(err, conn)
 		}
-		fmt.Println("\x1b[36m", reqHeader, "\x1b[0m")
 		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
 		if len(body) < 10 {
 			return b.serverError(errors.New("Kafka request too short."), conn)
@@ -75,10 +79,6 @@ func (b *MockBroker) serverLoop() (ok bool) {
 		if _, err = io.ReadFull(conn, body); err != nil {
 			return b.serverError(err, conn)
 		}
-		fmt.Println("\x1b[35m", body, "\x1b[0m")
-		apiKey := binary.BigEndian.Uint16(body[0:])
-		fmt.Println("\x1b[35m", apiKey, "\x1b[0m")
-
 		response := expectation.ResponseBytes()
 
 		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
@@ -112,13 +112,13 @@ func (b *MockBroker) serverError(err error, conn net.Conn) bool {
 // New launches a fake Kafka broker. It takes a testing.T as provided by the
 // test framework and a channel of responses to use.  If an error occurs it is
 // simply logged to the testing.T and the broker exits.
-func New(t *testing.T, brokerId int) *MockBroker {
+func New(t *testing.T, brokerID int) *MockBroker {
 	var err error
 
 	broker := &MockBroker{
 		stopper:      make(chan bool),
 		t:            t,
-		BrokerID:     brokerId,
+		brokerID:     brokerID,
 		expectations: make(chan Expectation, 512),
 	}
 

+ 34 - 0
mockbroker/produce_request_expectation_test.go

@@ -0,0 +1,34 @@
+package mockbroker
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestProduceRequestSerialization(t *testing.T) {
+
+	exp := new(ProduceRequestExpectation).
+		AddTopicPartition("topic_b", 0, 1, nil).
+		AddTopicPartition("topic_c", 0, 1, nil)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+		0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+		0x00, 0x00, // 21:22 error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
+
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}

+ 0 - 3
producer.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"errors"
-	"fmt"
 	"sync"
 	"time"
 )
@@ -278,7 +277,6 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32)
 }
 
 func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
-	fmt.Printf("%p %d %d\n", bp, bp.bufferedBytes, maxBufferBytes)
 	if bp.bufferedBytes > maxBufferBytes {
 		select {
 		case bp.flushNow <- true:
@@ -399,7 +397,6 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 }
 
 func (bp *brokerProducer) Close() error {
-	fmt.Printf("%p Close()\n", bp)
 	select {
 	case <-bp.stopper:
 		return errors.New("already closed or closing")

+ 28 - 73
producer_test.go

@@ -1,7 +1,6 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
 	"github.com/shopify/sarama/mockbroker"
 	"testing"
@@ -11,43 +10,20 @@ import (
 const TestMessage = "ABC THE MESSAGE"
 
 func TestSimpleProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
 
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00,
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		extraResponses <- msg
-	}()
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 1, 2)
+
+	mb2.ExpectProduceRequest().
+		AddTopicPartition("my_topic", 1, 10, nil)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -68,45 +44,22 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func TestSimpleSyncProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	mockExtra := mockbroker.New(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
 
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00,
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		for i := 0; i < 10; i++ {
-			extraResponses <- msg
-		}
-	}()
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 1, 2)
+
+	for i := 0; i < 10; i++ {
+		mb2.ExpectProduceRequest().
+			AddTopicPartition("my_topic", 1, 10, nil)
+	}
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -124,6 +77,7 @@ func TestSimpleSyncProducer(t *testing.T) {
 	}
 }
 
+/*
 func TestMultipleFlushes(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
@@ -431,6 +385,7 @@ func TestFailureRetry(t *testing.T) {
 	// message (which previously failed). This forces a flush.
 
 }
+*/
 
 func readMessage(t *testing.T, ch chan error) {
 	select {