Burke Libbey 11 years ago
parent
commit
96d86eca56

+ 7 - 0
broker.go

@@ -243,6 +243,13 @@ func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder
 
 	select {
 	case buf := <-promise.packets:
+		a := decode(buf, res)
+		color := "34"
+		if a != nil {
+			color = "31"
+		}
+		fmt.Printf("\x1b[%sm{{{\n  bytes: %+v\n  result: %+v\n  err: %+v\n  type: %T\n}}}\x1b[0m\n",
+			color, buf, res, a, res)
 		return decode(buf, res)
 	case err = <-promise.errors:
 		return err

+ 2 - 138
broker_test.go

@@ -1,147 +1,11 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
-	"io"
-	"net"
-	"strconv"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
-	"time"
 )
 
-// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
-// accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
-// the server sleeps for 250ms instead of reading a request).
-//
-// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
-// waiting for a response, the test panics.
-//
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
-// automatically as a convenience.
-type MockBroker struct {
-	port      int32
-	stopper   chan bool
-	responses chan []byte
-	listener  net.Listener
-	t         *testing.T
-}
-
-func (b *MockBroker) Port() int32 {
-	return b.port
-}
-
-func (b *MockBroker) Addr() string {
-	return b.listener.Addr().String()
-}
-
-// Close closes the response channel originally provided, then waits to make sure
-// that all requests/responses matched up before exiting.
-func (b *MockBroker) Close() {
-	close(b.responses)
-	<-b.stopper
-}
-
-func (b *MockBroker) serverLoop() {
-	defer close(b.stopper)
-	conn, err := b.listener.Accept()
-	if err != nil {
-		b.t.Error(err)
-		conn.Close()
-		b.listener.Close()
-		return
-	}
-	reqHeader := make([]byte, 4)
-	resHeader := make([]byte, 8)
-	for response := range b.responses {
-		if response == nil {
-			time.Sleep(250 * time.Millisecond)
-			continue
-		}
-		_, err := io.ReadFull(conn, reqHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
-		if len(body) < 10 {
-			b.t.Error("Kafka request too short.")
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = io.ReadFull(conn, body)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		if len(response) == 0 {
-			continue
-		}
-		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
-		_, err = conn.Write(resHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = conn.Write(response)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-	}
-	err = conn.Close()
-	if err != nil {
-		b.t.Error(err)
-		b.listener.Close()
-		return
-	}
-	err = b.listener.Close()
-	if err != nil {
-		b.t.Error(err)
-		return
-	}
-}
-
-// NewMockBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
-// If an error occurs it is simply logged to the testing.T and the broker exits.
-func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
-	var err error
-
-	broker := new(MockBroker)
-	broker.stopper = make(chan bool)
-	broker.responses = responses
-	broker.t = t
-
-	broker.listener, err = net.Listen("tcp", "localhost:0")
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
-	if err != nil {
-		t.Fatal(err)
-	}
-	tmp, err := strconv.ParseInt(portStr, 10, 32)
-	if err != nil {
-		t.Fatal(err)
-	}
-	broker.port = int32(tmp)
-
-	go broker.serverLoop()
-
-	return broker
-}
-
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
 	err := broker.Open(4)
@@ -180,7 +44,7 @@ func TestBrokerAccessors(t *testing.T) {
 
 func TestSimpleBrokerCommunication(t *testing.T) {
 	responses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
+	mockBroker := mockbroker.New(t, responses)
 	defer mockBroker.Close()
 
 	broker := NewBroker(mockBroker.Addr())

+ 8 - 7
client_test.go

@@ -2,12 +2,13 @@ package sarama
 
 import (
 	"encoding/binary"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 )
 
 func TestSimpleClient(t *testing.T) {
 	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
+	mockBroker := mockbroker.New(t, responses)
 	defer mockBroker.Close()
 
 	// Only one response needed, an empty metadata response
@@ -22,8 +23,8 @@ func TestSimpleClient(t *testing.T) {
 
 func TestClientExtraBrokers(t *testing.T) {
 	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, make(chan []byte))
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -46,8 +47,8 @@ func TestClientExtraBrokers(t *testing.T) {
 
 func TestClientMetadata(t *testing.T) {
 	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, make(chan []byte))
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -101,8 +102,8 @@ func TestClientMetadata(t *testing.T) {
 func TestClientRefreshBehaviour(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 

+ 7 - 6
consumer_test.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"encoding/binary"
 	"fmt"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 	"time"
 )
@@ -37,8 +38,8 @@ var (
 func TestSimpleConsumer(t *testing.T) {
 	masterResponses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, masterResponses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -98,8 +99,8 @@ func TestSimpleConsumer(t *testing.T) {
 func TestConsumerRawOffset(t *testing.T) {
 	masterResponses := make(chan []byte, 1)
 	extraResponses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, masterResponses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -130,8 +131,8 @@ func TestConsumerRawOffset(t *testing.T) {
 func TestConsumerLatestOffset(t *testing.T) {
 	masterResponses := make(chan []byte, 1)
 	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, masterResponses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 

+ 118 - 0
mockbroker/metadata_request_expectation.go

@@ -0,0 +1,118 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type MetadataRequestExpectation struct {
+	err             error
+	topicPartitions []metadataRequestTP
+	brokers         []*MockBroker
+}
+
+type metadataRequestTP struct {
+	topic     string
+	partition int32
+	brokerId  int
+}
+
+func (e *MetadataRequestExpectation) AddBroker(b *MockBroker) *MetadataRequestExpectation {
+	e.brokers = append(e.brokers, b)
+	return e
+}
+
+func (e *MetadataRequestExpectation) AddTopicPartition(
+	topic string, partition int32, brokerId int,
+) *MetadataRequestExpectation {
+	mrtp := metadataRequestTP{topic, partition, brokerId}
+	e.topicPartitions = append(e.topicPartitions, mrtp)
+	return e
+}
+
+func (e *MetadataRequestExpectation) Error(err error) *MetadataRequestExpectation {
+	e.err = err
+	return e
+}
+
+func (e *MetadataRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	binary.Write(buf, binary.BigEndian, uint32(len(e.brokers)))
+	for _, broker := range e.brokers {
+		binary.Write(buf, binary.BigEndian, uint32(broker.BrokerID))
+		buf.Write([]byte{0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't'})
+		binary.Write(buf, binary.BigEndian, uint32(broker.port))
+	}
+
+	byTopic := make(map[string][]metadataRequestTP)
+	for _, mrtp := range e.topicPartitions {
+		byTopic[mrtp.topic] = append(byTopic[mrtp.topic], mrtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		// we don't support mocking errors at topic-level; only partition-level
+		binary.Write(buf, binary.BigEndian, uint16(0))
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic)) // TODO: Does this write a null?
+		binary.Write(buf, binary.BigEndian, uint16(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: Write the error code instead
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint32(tp.brokerId))
+			binary.Write(buf, binary.BigEndian, uint32(0)) // replica set
+			binary.Write(buf, binary.BigEndian, uint32(0)) // ISR set
+		}
+	}
+
+	// Sample response:
+	/*
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
+
+		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
+
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+	*/
+
+	return buf.Bytes()
+}
+
+func (b *MockBroker) ExpectMetadataRequest() *MetadataRequestExpectation {
+	e := &MetadataRequestExpectation{}
+	b.expectations <- e
+	return e
+}

+ 142 - 0
mockbroker/mockbroker.go

@@ -0,0 +1,142 @@
+package mockbroker
+
+import (
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"strconv"
+	"testing"
+)
+
+// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+// accepts a single connection. It reads Kafka requests from that connection and returns each response
+// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
+// the server sleeps for 250ms instead of reading a request).
+//
+// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
+// waiting for a response, the test panics.
+//
+// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
+// automatically as a convenience.
+type MockBroker struct {
+	BrokerID     int
+	port         int32
+	stopper      chan bool
+	expectations chan Expectation
+	listener     net.Listener
+	t            *testing.T
+}
+
+type Expectation interface {
+	ResponseBytes() []byte
+}
+
+func (b *MockBroker) Port() int32 {
+	return b.port
+}
+
+func (b *MockBroker) Addr() string {
+	return b.listener.Addr().String()
+}
+
+func (b *MockBroker) Close() {
+	close(b.expectations)
+	<-b.stopper
+}
+
+func (b *MockBroker) serverLoop() (ok bool) {
+	var (
+		err  error
+		conn net.Conn
+	)
+
+	defer close(b.stopper)
+	if conn, err = b.listener.Accept(); err != nil {
+		return b.serverError(err, conn)
+	}
+	reqHeader := make([]byte, 4)
+	resHeader := make([]byte, 8)
+	for expectation := range b.expectations {
+		/* AfterDelay(time.Duration) maybe? */
+		/* if response == nil { */
+		/* 	time.Sleep(250 * time.Millisecond) */
+		/* 	continue */
+		/* } */
+		if _, err = io.ReadFull(conn, reqHeader); err != nil {
+			return b.serverError(err, conn)
+		}
+		fmt.Println("\x1b[36m", reqHeader, "\x1b[0m")
+		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
+		if len(body) < 10 {
+			return b.serverError(errors.New("Kafka request too short."), conn)
+		}
+		if _, err = io.ReadFull(conn, body); err != nil {
+			return b.serverError(err, conn)
+		}
+		fmt.Println("\x1b[35m", body, "\x1b[0m")
+		apiKey := binary.BigEndian.Uint16(body[0:])
+		fmt.Println("\x1b[35m", apiKey, "\x1b[0m")
+
+		response := expectation.ResponseBytes()
+
+		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+		if _, err = conn.Write(resHeader); err != nil {
+			return b.serverError(err, conn)
+		}
+		if _, err = conn.Write(response); err != nil {
+			return b.serverError(err, conn)
+		}
+	}
+	if err = conn.Close(); err != nil {
+		return b.serverError(err, nil)
+	}
+	if err = b.listener.Close(); err != nil {
+		b.t.Error(err)
+		return false
+	}
+	return true
+}
+
+func (b *MockBroker) serverError(err error, conn net.Conn) bool {
+	b.t.Error(err)
+	if conn != nil {
+		conn.Close()
+	}
+	b.listener.Close()
+	return false
+}
+
+// New launches a fake Kafka broker. It takes a testing.T as provided by the
+// test framework and a channel of responses to use.  If an error occurs it is
+// simply logged to the testing.T and the broker exits.
+func New(t *testing.T, brokerId int) *MockBroker {
+	var err error
+
+	broker := &MockBroker{
+		stopper:      make(chan bool),
+		t:            t,
+		BrokerID:     brokerId,
+		expectations: make(chan Expectation, 512),
+	}
+
+	broker.listener, err = net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
+	if err != nil {
+		t.Fatal(err)
+	}
+	tmp, err := strconv.ParseInt(portStr, 10, 32)
+	if err != nil {
+		t.Fatal(err)
+	}
+	broker.port = int32(tmp)
+
+	go broker.serverLoop()
+
+	return broker
+}

+ 77 - 0
mockbroker/produce_request_expectation.go

@@ -0,0 +1,77 @@
+package mockbroker
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type ProduceRequestExpectation struct {
+	err             error
+	topicPartitions []produceRequestTP
+}
+
+type produceRequestTP struct {
+	topic     string
+	partition int32
+	nMessages int
+	err       error
+}
+
+func (e *ProduceRequestExpectation) AddTopicPartition(
+	topic string, partition int32, nMessages int, err error,
+) *ProduceRequestExpectation {
+	prtp := produceRequestTP{topic, partition, nMessages, err}
+	e.topicPartitions = append(e.topicPartitions, prtp)
+	return e
+}
+
+func (b *MockBroker) ExpectProduceRequest() *ProduceRequestExpectation {
+	e := &ProduceRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *ProduceRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]produceRequestTP)
+	for _, prtp := range e.topicPartitions {
+		byTopic[prtp.topic] = append(byTopic[prtp.topic], prtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic)) // TODO: Does this write a null?
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: error
+			binary.Write(buf, binary.BigEndian, uint64(0)) // offset
+		}
+	}
+
+	/*
+	   sample response:
+
+	   0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+	   0x00, 0x00, // 21:22 error: 0 means no error
+	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+
+	   0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
+	   0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+	   0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+	   0x00, 0x00, // 21:22 error: 0 means no error
+	   0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+	*/
+	return buf.Bytes()
+}
+
+func (e *ProduceRequestExpectation) Error(err error) *ProduceRequestExpectation {
+	e.err = err
+	return e
+}

+ 4 - 0
producer.go

@@ -1,6 +1,8 @@
 package sarama
 
 import (
+	"errors"
+	"fmt"
 	"sync"
 	"time"
 )
@@ -210,6 +212,7 @@ func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error)
 		}
 		p.m.Unlock()
 	}
+
 	return bp, nil
 }
 
@@ -275,6 +278,7 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32)
 }
 
 func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
+	fmt.Printf("%p %d %d\n", bp, bp.bufferedBytes, maxBufferBytes)
 	if bp.bufferedBytes > maxBufferBytes {
 		select {
 		case bp.flushNow <- true:

+ 165 - 40
producer_test.go

@@ -3,15 +3,18 @@ package sarama
 import (
 	"encoding/binary"
 	"fmt"
+	"github.com/shopify/sarama/mockbroker"
 	"testing"
 	"time"
 )
 
+const TestMessage = "ABC THE MESSAGE"
+
 func TestSimpleProducer(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -53,22 +56,22 @@ func TestSimpleProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
 	// flush only on 10th and final message
 	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
 	for _, f := range returns {
-		sendMessage(t, producer, "my_topic", "ABC THE MESSAGE", f)
+		sendMessage(t, producer, "my_topic", TestMessage, f)
 	}
 }
 
 func TestSimpleSyncProducer(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -112,39 +115,20 @@ func TestSimpleSyncProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
-		sendSyncMessage(t, producer, "my_topic", "ABC THE MESSAGE")
-	}
-}
-
-func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
-	err := producer.QueueMessage(topic, nil, StringEncoder(key))
-	if err != nil {
-		t.Error(err)
-	}
-	for i := 0; i < expectedResponses; i++ {
-		readMessage(t, producer.Errors())
-	}
-	assertNoMessages(t, producer.Errors())
-}
-
-func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
-	err := producer.SendMessage(topic, nil, StringEncoder(key))
-	if err != nil {
-		t.Error(err)
+		sendSyncMessage(t, producer, "my_topic", TestMessage)
 	}
-	assertNoMessages(t, producer.Errors())
 }
 
 func TestMultipleFlushes(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
+	mockBroker := mockbroker.New(t, responses)
+	mockExtra := mockbroker.New(t, extraResponses)
 	defer mockBroker.Close()
 	defer mockExtra.Close()
 
@@ -188,23 +172,24 @@ func TestMultipleFlushes(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 5th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 5) - 1),
+		MaxBufferBytes: uint32((len(TestMessage) * 5) - 1),
 	})
 	defer producer.Close()
 
 	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
 	for _, f := range returns {
-		sendMessage(t, producer, "my_topic", "ABC THE MESSAGE", f)
+		sendMessage(t, producer, "my_topic", TestMessage, f)
 	}
 }
 
 func TestMultipleProducer(t *testing.T) {
+
 	responses := make(chan []byte, 1)
 	responsesA := make(chan []byte)
 	responsesB := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockBrokerA := NewMockBroker(t, responsesA)
-	mockBrokerB := NewMockBroker(t, responsesB)
+	mockBroker := mockbroker.New(t, responses)
+	mockBrokerA := mockbroker.New(t, responsesA)
+	mockBrokerB := mockbroker.New(t, responsesB)
 	defer mockBroker.Close()
 	defer mockBrokerA.Close()
 	defer mockBrokerB.Close()
@@ -303,29 +288,150 @@ func TestMultipleProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
 	// flush only on 10th and final message
 	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
 	for _, f := range returns {
-		sendMessage(t, producer, "topic_a", "ABC THE MESSAGE", f)
+		sendMessage(t, producer, "topic_a", TestMessage, f)
 	}
 
 	// no flushes
 	returns = []int{0, 0, 0, 0, 0}
 	for _, f := range returns {
-		sendMessage(t, producer, "topic_b", "ABC THE MESSAGE", f)
+		sendMessage(t, producer, "topic_b", TestMessage, f)
 	}
 
 	// flush both topic_b and topic_c on 5th (ie. 10th for this broker)
 	returns = []int{0, 0, 0, 0, 2}
 	for _, f := range returns {
-		sendMessage(t, producer, "topic_c", "ABC THE MESSAGE", f)
+		sendMessage(t, producer, "topic_c", TestMessage, f)
 	}
 }
 
+// Here we test that when two messages are sent in the same buffered request,
+// and more messages are enqueued while the request is pending, everything
+// happens correctly; that is, the first messages are retried before the next
+// batch is allowed to submit.
+func TestFailureRetry(t *testing.T) {
+	responses := make(chan []byte, 1)
+	responsesA := make(chan []byte)
+	mockBroker := mockbroker.New(t, responses)
+	mockBrokerA := mockbroker.New(t, responsesA)
+	defer mockBroker.Close()
+	defer mockBrokerA.Close()
+
+	// We're going to return:
+	// topic: topic_a; partition: 0; brokerID: 1
+	// topic: topic_b; partition: 0; brokerID: 2
+	// topic: topic_c; partition: 0; brokerID: 2
+
+	// Return the extra broker metadata so that the producer will send
+	// requests to mockBrokerA and mockBrokerB.
+	metadataResponse := []byte{
+		0x00, 0x00, 0x00, 0x01, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
+
+		0x00, 0x00, 0x00, 0x02, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+	}
+	binary.BigEndian.PutUint32(metadataResponse[19:], uint32(mockBrokerA.Port()))
+	responses <- metadataResponse
+
+	go func() {
+		responses <- metadataResponse
+		responses <- metadataResponse
+	}()
+
+	go func() {
+		responsesA <- []byte{
+			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: 0 means no error
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x03, // 21:22 error: UnknownTopicOrPartition
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
+		}
+		responsesA <- metadataResponse
+		successResponse := []byte{
+			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: no error
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 23:30 offset
+		}
+		_ = successResponse
+	}()
+
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush after the 2nd message.
+		MaxBufferBytes:     uint32((len(TestMessage) * 2) - 1),
+		MaxDeliveryRetries: 1,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	// Sent to the first BP; does not flush because it's only half the cap.
+	println("WTF1")
+	sendMessage(t, producer, "topic_a", TestMessage, 0)
+	// Sent to the first BP; flushes, errors (retriable).
+	// There's a delay, during which the next message is enqueued to the first BP,
+	// after which the BP is closed and the message is re-enqueued to the second
+	// BP. This BP is not flushed immediately because it is only at half-cap.
+	println("WTF2")
+	sendMessage(t, producer, "topic_b", TestMessage, 1)
+	// This happens before the BP is terminated, and the message is enqueued to
+	// the first BP. It is not immediately flushed, because it is at half-cap.
+	println("WTF")
+	sendMessage(t, producer, "topic_b", TestMessage, 1)
+
+	// Now the Close() runs on the first BP. The BP has buffered the second
+	// message (which previously failed). This forces a flush.
+
+}
+
 func readMessage(t *testing.T, ch chan error) {
 	select {
 	case err := <-ch:
@@ -340,8 +446,8 @@ func readMessage(t *testing.T, ch chan error) {
 func assertNoMessages(t *testing.T, ch chan error) {
 	select {
 	case x := <-ch:
-		t.Error(fmt.Errorf("unexpected value received: %#v", x))
-	case <-time.After(5 * time.Millisecond):
+		t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
+	case <-time.After(1 * time.Millisecond):
 	}
 }
 
@@ -367,3 +473,22 @@ func ExampleProducer() {
 		fmt.Println("> message sent")
 	}
 }
+
+func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
+	err := producer.QueueMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	for i := 0; i < expectedResponses; i++ {
+		readMessage(t, producer.Errors())
+	}
+	assertNoMessages(t, producer.Errors())
+}
+
+func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
+	err := producer.SendMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	assertNoMessages(t, producer.Errors())
+}