Browse Source

Merge pull request #41 from Shopify/multiproducer

MultiProducer
Burke Libbey 12 years ago
parent
commit
4f917dbfae
18 changed files with 1394 additions and 468 deletions
  1. 1 0
      .gitignore
  2. 25 0
      broker.go
  3. 15 146
      broker_test.go
  4. 45 93
      client_test.go
  5. 46 103
      consumer_test.go
  6. 68 0
      fetch_response.go
  7. 108 0
      metadata_response.go
  8. 155 0
      mockbroker.go
  9. 13 0
      offset_fetch_response.go
  10. 52 0
      offset_response.go
  11. 2 0
      packet_encoder.go
  12. 4 0
      partitioner.go
  13. 17 0
      prep_encoder.go
  14. 101 0
      produce_message.go
  15. 37 0
      produce_response.go
  16. 388 82
      producer.go
  17. 303 44
      producer_test.go
  18. 14 0
      real_encoder.go

+ 1 - 0
.gitignore

@@ -2,6 +2,7 @@
 *.o
 *.a
 *.so
+*.test
 
 # Folders
 _obj

+ 25 - 0
broker.go

@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"strconv"
+	"strings"
 	"sync"
 )
 
@@ -273,6 +275,29 @@ func (b *Broker) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (b *Broker) encode(pe packetEncoder) (err error) {
+
+	parts := strings.Split(b.addr, ":")
+	if len(parts) != 2 {
+		return fmt.Errorf("invalid addr")
+	}
+	port, err := strconv.Atoi(parts[1])
+	if err != nil {
+		return err
+	}
+
+	pe.putInt32(b.id)
+
+	err = pe.putString(parts[0])
+	if err != nil {
+		return err
+	}
+
+	pe.putInt32(int32(port))
+
+	return nil
+}
+
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	for response := range b.responses {

+ 15 - 146
broker_test.go

@@ -1,147 +1,10 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
-	"io"
-	"net"
-	"strconv"
 	"testing"
-	"time"
 )
 
-// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
-// accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
-// the server sleeps for 250ms instead of reading a request).
-//
-// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
-// waiting for a response, the test panics.
-//
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
-// automatically as a convenience.
-type MockBroker struct {
-	port      int32
-	stopper   chan bool
-	responses chan []byte
-	listener  net.Listener
-	t         *testing.T
-}
-
-func (b *MockBroker) Port() int32 {
-	return b.port
-}
-
-func (b *MockBroker) Addr() string {
-	return b.listener.Addr().String()
-}
-
-// Close closes the response channel originally provided, then waits to make sure
-// that all requests/responses matched up before exiting.
-func (b *MockBroker) Close() {
-	close(b.responses)
-	<-b.stopper
-}
-
-func (b *MockBroker) serverLoop() {
-	defer close(b.stopper)
-	conn, err := b.listener.Accept()
-	if err != nil {
-		b.t.Error(err)
-		conn.Close()
-		b.listener.Close()
-		return
-	}
-	reqHeader := make([]byte, 4)
-	resHeader := make([]byte, 8)
-	for response := range b.responses {
-		if response == nil {
-			time.Sleep(250 * time.Millisecond)
-			continue
-		}
-		_, err := io.ReadFull(conn, reqHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
-		if len(body) < 10 {
-			b.t.Error("Kafka request too short.")
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = io.ReadFull(conn, body)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		if len(response) == 0 {
-			continue
-		}
-		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
-		_, err = conn.Write(resHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = conn.Write(response)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-	}
-	err = conn.Close()
-	if err != nil {
-		b.t.Error(err)
-		b.listener.Close()
-		return
-	}
-	err = b.listener.Close()
-	if err != nil {
-		b.t.Error(err)
-		return
-	}
-}
-
-// NewMockBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
-// If an error occurs it is simply logged to the testing.T and the broker exits.
-func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
-	var err error
-
-	broker := new(MockBroker)
-	broker.stopper = make(chan bool)
-	broker.responses = responses
-	broker.t = t
-
-	broker.listener, err = net.Listen("tcp", "localhost:0")
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
-	if err != nil {
-		t.Fatal(err)
-	}
-	tmp, err := strconv.ParseInt(portStr, 10, 32)
-	if err != nil {
-		t.Fatal(err)
-	}
-	broker.port = int32(tmp)
-
-	go broker.serverLoop()
-
-	return broker
-}
-
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
 	err := broker.Open(4)
@@ -161,6 +24,15 @@ func ExampleBroker() error {
 	return nil
 }
 
+type mockEncoder struct {
+	bytes []byte
+}
+
+func (m mockEncoder) encode(pe packetEncoder) error {
+	pe.putRawBytes(m.bytes)
+	return nil
+}
+
 func TestBrokerAccessors(t *testing.T) {
 	broker := NewBroker("abc:123")
 
@@ -179,21 +51,18 @@ func TestBrokerAccessors(t *testing.T) {
 }
 
 func TestSimpleBrokerCommunication(t *testing.T) {
-	responses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	defer mockBroker.Close()
+	mb := NewMockBroker(t, 0)
+	defer mb.Close()
 
-	broker := NewBroker(mockBroker.Addr())
+	broker := NewBroker(mb.Addr())
 	err := broker.Open(4)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	go func() {
-		for _, tt := range brokerTestTable {
-			responses <- tt.response
-		}
-	}()
+	for _, tt := range brokerTestTable {
+		mb.Returns(&mockEncoder{tt.response})
+	}
 	for _, tt := range brokerTestTable {
 		tt.runner(t, broker)
 	}

+ 45 - 93
client_test.go

@@ -1,80 +1,58 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"testing"
 )
 
 func TestSimpleClient(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	defer mockBroker.Close()
 
-	// Only one response needed, an empty metadata response
-	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+	mb := NewMockBroker(t, 1)
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb.Returns(new(MetadataResponse))
+
+	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	client.Close()
+	defer client.Close()
+	defer mb.Close()
 }
 
 func TestClientExtraBrokers(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mb1.Returns(mdr)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	client.Close()
+	defer client.Close()
+	defer mb1.Close()
+	defer mb2.Close()
 }
 
 func TestClientMetadata(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := NewMockBroker(t, 1)
+	mb5 := NewMockBroker(t, 5)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb5.Addr(), int32(mb5.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, int32(mb5.BrokerID()))
+	mb1.Returns(mdr)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
 
 	topics, err := client.Topics()
 	if err != nil {
@@ -99,50 +77,24 @@ func TestClientMetadata(t *testing.T) {
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x05,
-		0x00, 0x00, 0x00, 0x0e,
-		0xFF, 0xFF, 0xFF, 0xFF,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x0b,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-
-	client, err := NewClient("clientID", []string{mockBroker.Addr()}, &ClientConfig{MetadataRetries: 1})
+	mb1 := NewMockBroker(t, 1)
+	mb5 := NewMockBroker(t, 5)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb5.Addr(), int32(mb5.BrokerID()))
+	mb1.Returns(mdr)
+
+	mdr2 := new(MetadataResponse)
+	mdr2.AddTopicPartition("my_topic", 0xb, int32(mb5.BrokerID()))
+	mb5.Returns(mdr2)
+
+	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
 
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
@@ -154,7 +106,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	tst, err := client.Leader("my_topic", 0xb)
 	if err != nil {
 		t.Error(err)
-	} else if tst.ID() != 0xaa {
+	} else if tst.ID() != 5 {
 		t.Error("Leader for my_topic had incorrect ID.")
 	}
 

+ 46 - 103
consumer_test.go

@@ -1,78 +1,27 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
 	"testing"
 	"time"
 )
 
-var (
-	consumerStopper = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-	extraBrokerMetadata = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-)
-
 func TestSimpleConsumer(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x1C,
-				// messageSet
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00, 0x00, 0x10,
-				// message
-				0x23, 0x96, 0x4a, 0xf7, // CRC
-				0x00,
-				0x00,
-				0xFF, 0xFF, 0xFF, 0xFF,
-				0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
-			binary.BigEndian.PutUint64(msg[36:], uint64(i))
-			extraResponses <- msg
-		}
-		extraResponses <- consumerStopper
-	}()
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	for i := 0; i < 10; i++ {
+		fr := new(FetchResponse)
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+		mb2.Returns(fr)
+	}
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -83,6 +32,8 @@ func TestSimpleConsumer(t *testing.T) {
 		t.Fatal(err)
 	}
 	defer consumer.Close()
+	defer mb1.Close()
+	defer mb2.Close()
 
 	for i := 0; i < 10; i++ {
 		event := <-consumer.Events()
@@ -93,24 +44,20 @@ func TestSimpleConsumer(t *testing.T) {
 			t.Error("Incorrect message offset!")
 		}
 	}
+
 }
 
 func TestConsumerRawOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- consumerStopper
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -122,36 +69,29 @@ func TestConsumerRawOffset(t *testing.T) {
 	}
 	defer consumer.Close()
 
+	defer mb1.Close()
+	defer mb2.Close()
+
 	if consumer.offset != 1234 {
 		t.Error("Raw offset not set correctly")
 	}
 }
 
 func TestConsumerLatestOffset(t *testing.T) {
-	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, masterResponses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := make([]byte, len(extraBrokerMetadata))
-	copy(response, extraBrokerMetadata)
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	masterResponses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01,
-	}
-	extraResponses <- consumerStopper
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	or := new(OffsetResponse)
+	or.AddTopicPartition("my_topic", 0, 0x010101)
+	mb2.Returns(or)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -163,6 +103,9 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 	defer consumer.Close()
 
+	defer mb2.Close()
+	defer mb1.Close()
+
 	if consumer.offset != 0x010101 {
 		t.Error("Latest offset not fetched correctly")
 	}

+ 68 - 0
fetch_response.go

@@ -36,6 +36,19 @@ type FetchResponse struct {
 	Blocks map[string]map[int32]*FetchResponseBlock
 }
 
+func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(pr.Err))
+
+	pe.putInt64(pr.HighWaterMarkOffset)
+
+	pe.push(&lengthField{})
+	err = pr.MsgSet.encode(pe)
+	if err != nil {
+		return err
+	}
+	return pe.pop()
+}
+
 func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
@@ -74,6 +87,35 @@ func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
+	err = pe.putArrayLength(len(fr.Blocks))
+	if err != nil {
+		return err
+	}
+
+	for topic, partitions := range fr.Blocks {
+		err = pe.putString(topic)
+		if err != nil {
+			return err
+		}
+
+		err = pe.putArrayLength(len(partitions))
+		if err != nil {
+			return err
+		}
+
+		for id, block := range partitions {
+			pe.putInt32(id)
+			err = block.encode(pe)
+			if err != nil {
+				return err
+			}
+		}
+
+	}
+	return nil
+}
+
 func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
 	if fr.Blocks == nil {
 		return nil
@@ -85,3 +127,29 @@ func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseB
 
 	return fr.Blocks[topic][partition]
 }
+
+func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+	if fr.Blocks == nil {
+		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)
+	}
+	partitions, ok := fr.Blocks[topic]
+	if !ok {
+		partitions = make(map[int32]*FetchResponseBlock)
+		fr.Blocks[topic] = partitions
+	}
+	frb := new(FetchResponseBlock)
+	partitions[partition] = frb
+	var kb []byte
+	var vb []byte
+	if key != nil {
+		kb, _ = key.Encode()
+	}
+	if value != nil {
+		vb, _ = value.Encode()
+	}
+	var msgSet MessageSet
+	msg := &Message{Key: kb, Value: vb}
+	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
+	msgSet.Messages = append(msgSet.Messages, msgBlock)
+	frb.MsgSet = msgSet
+}

+ 108 - 0
metadata_response.go

@@ -38,6 +38,24 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(pm.Err))
+	pe.putInt32(pm.ID)
+	pe.putInt32(pm.Leader)
+
+	err = pe.putInt32Array(pm.Replicas)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putInt32Array(pm.Isr)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 type TopicMetadata struct {
 	Err        KError
 	Name       string
@@ -72,6 +90,29 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (tm *TopicMetadata) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(tm.Err))
+
+	err = pe.putString(tm.Name)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putArrayLength(len(tm.Partitions))
+	if err != nil {
+		return err
+	}
+
+	for _, pm := range tm.Partitions {
+		err = pm.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type MetadataResponse struct {
 	Brokers []*Broker
 	Topics  []*TopicMetadata
@@ -108,3 +149,70 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (m *MetadataResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(m.Brokers))
+	if err != nil {
+		return err
+	}
+	for _, broker := range m.Brokers {
+		err = broker.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+
+	err = pe.putArrayLength(len(m.Topics))
+	if err != nil {
+		return err
+	}
+	for _, tm := range m.Topics {
+		err = tm.encode(pe)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// testing API
+
+func (m *MetadataResponse) AddBroker(addr string, id int32) {
+	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
+}
+
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32) {
+	var match *TopicMetadata
+
+	for _, tm := range m.Topics {
+		if tm.Name == topic {
+			match = tm
+			goto foundTopic
+		}
+	}
+
+	match = new(TopicMetadata)
+	match.Name = topic
+	m.Topics = append(m.Topics, match)
+
+foundTopic:
+
+	var pmatch *PartitionMetadata
+
+	for _, pm := range match.Partitions {
+		if pm.ID == partition {
+			pmatch = pm
+			goto foundPartition
+		}
+	}
+
+	pmatch = new(PartitionMetadata)
+	pmatch.ID = partition
+	match.Partitions = append(match.Partitions, pmatch)
+
+foundPartition:
+
+	pmatch.Leader = brokerID
+
+}

+ 155 - 0
mockbroker.go

@@ -0,0 +1,155 @@
+package sarama
+
+import (
+	"encoding/binary"
+	"errors"
+	"io"
+	"net"
+	"strconv"
+	"testing"
+)
+
+// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+// accepts a single connection. It reads Kafka requests from that connection and returns each response
+// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
+// the server sleeps for 250ms instead of reading a request).
+//
+// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
+// waiting for a response, the test panics.
+//
+// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
+// automatically as a convenience.
+type MockBroker struct {
+	brokerID     int
+	port         int32
+	stopper      chan bool
+	expectations chan encoder
+	listener     net.Listener
+	t            *testing.T
+	expecting    encoder
+}
+
+func (b *MockBroker) BrokerID() int {
+	return b.brokerID
+}
+
+func (b *MockBroker) Port() int32 {
+	return b.port
+}
+
+func (b *MockBroker) Addr() string {
+	return b.listener.Addr().String()
+}
+
+type rawExpectation []byte
+
+func (r rawExpectation) ResponseBytes() []byte {
+	return r
+}
+
+func (b *MockBroker) Close() {
+	if b.expecting != nil {
+		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %#v", b.BrokerID(), b.expecting)
+	}
+	close(b.expectations)
+	<-b.stopper
+}
+
+func (b *MockBroker) serverLoop() (ok bool) {
+	var (
+		err  error
+		conn net.Conn
+	)
+
+	defer close(b.stopper)
+	if conn, err = b.listener.Accept(); err != nil {
+		return b.serverError(err, conn)
+	}
+	reqHeader := make([]byte, 4)
+	resHeader := make([]byte, 8)
+	for expectation := range b.expectations {
+		b.expecting = expectation
+		_, err = io.ReadFull(conn, reqHeader)
+		b.expecting = nil
+		if err != nil {
+			return b.serverError(err, conn)
+		}
+		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
+		if len(body) < 10 {
+			return b.serverError(errors.New("Kafka request too short."), conn)
+		}
+		if _, err = io.ReadFull(conn, body); err != nil {
+			return b.serverError(err, conn)
+		}
+
+		response, err := encode(expectation)
+		if err != nil {
+			return false
+		}
+		if len(response) == 0 {
+			continue
+		}
+
+		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+		if _, err = conn.Write(resHeader); err != nil {
+			return b.serverError(err, conn)
+		}
+		if _, err = conn.Write(response); err != nil {
+			return b.serverError(err, conn)
+		}
+	}
+	if err = conn.Close(); err != nil {
+		return b.serverError(err, nil)
+	}
+	if err = b.listener.Close(); err != nil {
+		b.t.Error(err)
+		return false
+	}
+	return true
+}
+
+func (b *MockBroker) serverError(err error, conn net.Conn) bool {
+	b.t.Error(err)
+	if conn != nil {
+		conn.Close()
+	}
+	b.listener.Close()
+	return false
+}
+
+// New launches a fake Kafka broker. It takes a testing.T as provided by the
+// test framework and a channel of responses to use.  If an error occurs it is
+// simply logged to the testing.T and the broker exits.
+func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
+	var err error
+
+	broker := &MockBroker{
+		stopper:      make(chan bool),
+		t:            t,
+		brokerID:     brokerID,
+		expectations: make(chan encoder, 512),
+	}
+
+	broker.listener, err = net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
+	if err != nil {
+		t.Fatal(err)
+	}
+	tmp, err := strconv.ParseInt(portStr, 10, 32)
+	if err != nil {
+		t.Fatal(err)
+	}
+	broker.port = int32(tmp)
+
+	go broker.serverLoop()
+
+	return broker
+}
+
+func (b *MockBroker) Returns(e encoder) {
+	b.expectations <- e
+}

+ 13 - 0
offset_fetch_response.go

@@ -26,6 +26,19 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (r *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt64(r.Offset)
+
+	err = pe.putString(r.Metadata)
+	if err != nil {
+		return err
+	}
+
+	pe.putInt16(int16(r.Err))
+
+	return nil
+}
+
 type OffsetFetchResponse struct {
 	ClientID string
 	Blocks   map[string]map[int32]*OffsetFetchResponseBlock

+ 52 - 0
offset_response.go

@@ -17,6 +17,12 @@ func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
 	return err
 }
 
+func (r *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(r.Err))
+
+	return pe.putInt64Array(r.Offsets)
+}
+
 type OffsetResponse struct {
 	Blocks map[string]map[int32]*OffsetResponseBlock
 }
@@ -70,3 +76,49 @@ func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponse
 
 	return r.Blocks[topic][partition]
 }
+
+/*
+// [0 0 0 1 ntopics
+0 8 109 121 95 116 111 112 105 99 topic
+0 0 0 1 npartitions
+0 0 0 0 id
+0 0
+
+0 0 0 1 0 0 0 0
+0 1 1 1 0 0 0 1
+0 8 109 121 95 116 111 112
+105 99 0 0 0 1 0 0
+0 0 0 0 0 0 0 1
+0 0 0 0 0 1 1 1] <nil>
+
+*/
+func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
+	if err = pe.putArrayLength(len(r.Blocks)); err != nil {
+		return err
+	}
+
+	for topic, partitions := range r.Blocks {
+		pe.putString(topic)
+		pe.putArrayLength(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			block.encode(pe)
+		}
+	}
+
+	return nil
+}
+
+// testing API
+
+func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
+	}
+	byTopic, ok := r.Blocks[topic]
+	if !ok {
+		byTopic = make(map[int32]*OffsetResponseBlock)
+		r.Blocks[topic] = byTopic
+	}
+	byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
+}

+ 2 - 0
packet_encoder.go

@@ -13,8 +13,10 @@ type packetEncoder interface {
 
 	// Collections
 	putBytes(in []byte) error
+	putRawBytes(in []byte) error
 	putString(in string) error
 	putInt32Array(in []int32) error
+	putInt64Array(in []int64) error
 
 	// Stacks, see PushEncoder
 	push(in pushEncoder)

+ 4 - 0
partitioner.go

@@ -4,6 +4,7 @@ import (
 	"hash"
 	"hash/fnv"
 	"math/rand"
+	"sync"
 	"time"
 )
 
@@ -17,6 +18,7 @@ type Partitioner interface {
 // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
 type RandomPartitioner struct {
 	generator *rand.Rand
+	m         sync.Mutex
 }
 
 func NewRandomPartitioner() *RandomPartitioner {
@@ -26,6 +28,8 @@ func NewRandomPartitioner() *RandomPartitioner {
 }
 
 func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	p.m.Lock()
+	defer p.m.Unlock()
 	return int32(p.generator.Intn(int(numPartitions)))
 }
 

+ 17 - 0
prep_encoder.go

@@ -49,6 +49,14 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 	return nil
 }
 
+func (pe *prepEncoder) putRawBytes(in []byte) error {
+	if len(in) > math.MaxInt32 {
+		return EncodingError
+	}
+	pe.length += len(in)
+	return nil
+}
+
 func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
@@ -67,6 +75,15 @@ func (pe *prepEncoder) putInt32Array(in []int32) error {
 	return nil
 }
 
+func (pe *prepEncoder) putInt64Array(in []int64) error {
+	err := pe.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+	pe.length += 8 * len(in)
+	return nil
+}
+
 // stackable
 
 func (pe *prepEncoder) push(in pushEncoder) {

+ 101 - 0
produce_message.go

@@ -0,0 +1,101 @@
+package sarama
+
+import "log"
+
+type produceMessage struct {
+	tp         topicPartition
+	key, value []byte
+	failures   uint32
+	sync       bool
+}
+
+type produceRequestBuilder []*produceMessage
+
+// If the message is synchronous, we manually send it and wait for a return.
+// Otherwise, we just hand it back to the producer to enqueue using the normal
+// method.
+func (msg *produceMessage) enqueue(p *Producer) error {
+	if !msg.sync {
+		return p.addMessage(msg)
+	}
+
+	var prb produceRequestBuilder = []*produceMessage{msg}
+	bp, err := p.brokerProducerFor(msg.tp)
+	if err != nil {
+		return err
+	}
+	errs := make(chan error, 1)
+	bp.flushRequest(p, prb, func(err error) {
+		errs <- err
+	})
+	return <-errs
+
+}
+
+func (msg *produceMessage) reenqueue(p *Producer) error {
+	if msg.failures < p.config.MaxDeliveryRetries {
+		msg.failures++
+		return msg.enqueue(p)
+	}
+	return nil
+}
+
+func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
+	return msg.tp.partition == partition && msg.tp.topic == topic
+}
+
+func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
+	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
+
+	// If compression is enabled, we need to group messages by topic-partition and
+	// wrap them in MessageSets. We already discarded that grouping, so we
+	// inefficiently re-sort them. This could be optimized (ie. pass a hash around
+	// rather than an array. Not sure what the best way is.
+	if config.Compression != CompressionNone {
+		msgSets := make(map[topicPartition]*MessageSet)
+		for _, pmsg := range b {
+			msgSet, ok := msgSets[pmsg.tp]
+			if !ok {
+				msgSet = new(MessageSet)
+				msgSets[pmsg.tp] = msgSet
+			}
+
+			msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
+		}
+		for tp, msgSet := range msgSets {
+			valBytes, err := encode(msgSet)
+			if err != nil {
+				log.Fatal(err) // if this happens, it's basically our fault.
+			}
+			msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
+			req.AddMessage(tp.topic, tp.partition, &msg)
+		}
+		return req
+	}
+
+	// Compression is not enabled. Dumb-ly append each request directly to the
+	// request, with no MessageSet wrapper.
+	for _, pmsg := range b {
+		msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
+		req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
+	}
+	return req
+}
+
+func (msg *produceMessage) byteSize() uint32 {
+	return uint32(len(msg.key) + len(msg.value))
+}
+
+func (b produceRequestBuilder) byteSize() uint32 {
+	var size uint32
+	for _, m := range b {
+		size += m.byteSize()
+	}
+	return size
+}
+
+func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
+	for i := len(b) - 1; i >= 0; i-- {
+		fn(b[i])
+	}
+}

+ 37 - 0
produce_response.go

@@ -62,6 +62,29 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+func (pr *ProduceResponse) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(pr.Blocks))
+	if err != nil {
+		return err
+	}
+	for topic, partitions := range pr.Blocks {
+		err = pe.putString(topic)
+		if err != nil {
+			return err
+		}
+		err = pe.putArrayLength(len(partitions))
+		if err != nil {
+			return err
+		}
+		for id, prb := range partitions {
+			pe.putInt32(id)
+			pe.putInt16(int16(prb.Err))
+			pe.putInt64(prb.Offset)
+		}
+	}
+	return nil
+}
+
 func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if pr.Blocks == nil {
 		return nil
@@ -73,3 +96,17 @@ func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceRespo
 
 	return pr.Blocks[topic][partition]
 }
+
+// Testing API
+
+func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
+	if pr.Blocks == nil {
+		pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
+	}
+	byTopic, ok := pr.Blocks[topic]
+	if !ok {
+		byTopic = make(map[int32]*ProduceResponseBlock)
+		pr.Blocks[topic] = byTopic
+	}
+	byTopic[partition] = &ProduceResponseBlock{Err: err}
+}

+ 388 - 82
producer.go

@@ -1,24 +1,75 @@
 package sarama
 
+import (
+	"fmt"
+	"sync"
+	"time"
+)
+
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
+//
+// If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and
+// constantly, but if multiple messages are received while a roundtrip to kafka
+// is in progress, they will both be combined into the next request. In this
+// mode, errors are not returned from SendMessage, but over the Errors()
+// channel.
+//
+// With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
+// buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
-	Partitioner  Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout      int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
-	Compression  CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
+	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	MaxBufferedBytes   uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
+	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
+	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
 }
 
-// Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
-// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
-// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
+// Producer publishes Kafka messages. It routes messages to the correct broker
+// for the provided topic-partition, refreshing metadata as appropriate, and
+// parses responses for errors. You must call Close() on a producer to avoid
+// leaks: it may not be garbage-collected automatically when it passes out of
+// scope (this is in addition to calling Close on the underlying client, which
+// is still necessary).
+//
+// The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
+// deliver messages immediately, but to buffer subsequent messages while a
+// previous request is in-flight. This is often the correct behaviour.
+//
+// If synchronous operation is desired, you can use SendMessage. This will cause
+// sarama to block until the broker has returned a value. Normally, you will
+// want to use QueueMessage instead, and read the error back from the Errors()
+// channel. Note that when using QueueMessage, you *must* read the values from
+// the Errors() channel, or sarama will block indefinitely after a few requests.
 type Producer struct {
-	client *Client
-	topic  string
-	config ProducerConfig
+	client          *Client
+	config          ProducerConfig
+	brokerProducers map[*Broker]*brokerProducer
+	m               sync.RWMutex
+	errors          chan error
+	deliveryLocks   map[topicPartition]chan bool
+	dm              sync.RWMutex
+}
+
+type brokerProducer struct {
+	mapM          sync.Mutex
+	messages      map[topicPartition][]*produceMessage
+	bufferedBytes uint32
+	flushNow      chan bool
+	broker        *Broker
+	stopper       chan bool
+	done          chan bool
+	hasMessages   chan bool
 }
 
-// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
-func NewProducer(client *Client, topic string, config *ProducerConfig) (*Producer, error) {
+type topicPartition struct {
+	topic     string
+	partition int32
+}
+
+// NewProducer creates a new Producer using the given client.
+func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	if config == nil {
 		config = new(ProducerConfig)
 	}
@@ -31,127 +82,382 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 		return nil, ConfigurationError("Invalid Timeout")
 	}
 
+	if config.MaxDeliveryRetries < 1 {
+		Logger.Println("Warning: config.MaxDeliveryRetries is set dangerously low. This will lead to occasional data loss.")
+	}
+
 	if config.Partitioner == nil {
 		config.Partitioner = NewRandomPartitioner()
 	}
 
-	if topic == "" {
-		return nil, ConfigurationError("Empty topic")
+	if config.MaxBufferedBytes == 0 {
+		config.MaxBufferedBytes = 1
 	}
 
-	p := new(Producer)
-	p.client = client
-	p.topic = topic
-	p.config = *config
+	return &Producer{
+		client:          client,
+		config:          *config,
+		errors:          make(chan error, 16),
+		deliveryLocks:   make(map[topicPartition]chan bool),
+		brokerProducers: make(map[*Broker]*brokerProducer),
+	}, nil
+}
 
-	return p, nil
+// When operating in asynchronous mode, provides access to errors generated
+// while parsing ProduceResponses from kafka. Should never be called in
+// synchronous mode.
+func (p *Producer) Errors() chan error {
+	return p.errors
 }
 
-// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
-// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
-// on the underlying client.
+// Close shuts down the producer and flushes any messages it may have buffered.
+// You must call this function before a producer object passes out of scope, as
+// it may otherwise leak memory. You must call this before calling Close on the
+// underlying client.
 func (p *Producer) Close() error {
-	// no-op for now, adding for consistency and so the API doesn't change when we add buffering
-	// (which will require a goroutine, which will require a close method in order to flush the buffer).
+	for _, bp := range p.brokerProducers {
+		bp.Close()
+	}
 	return nil
 }
 
-// SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
-// To send strings as either key or value, see the StringEncoder type.
-func (p *Producer) SendMessage(key, value Encoder) error {
-	return p.safeSendMessage(key, value, true)
+// QueueMessage sends a message with the given key and value to the given topic.
+// The partition to send to is selected by the Producer's Partitioner. To send
+// strings as either key or value, see the StringEncoder type.
+//
+// QueueMessage uses buffering semantics to reduce the nubmer of requests to the
+// broker. The buffer logic is tunable with config.MaxBufferedBytes and
+// config.MaxBufferTime.
+//
+// QueueMessage will return an error if it's unable to construct the message
+// (unlikely), but network and response errors must be read from Errors(), since
+// QueueMessage uses asynchronous delivery. Note that you MUST read back from
+// Errors(), otherwise the producer will stall after some number of errors.
+//
+// If you care about message ordering, you should not call QueueMessage and
+// SendMessage on the same Producer. Either, used alone, preserves ordering,
+// however.
+func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
+	return p.genericSendMessage(topic, key, value, false)
 }
 
-func (p *Producer) choosePartition(key Encoder) (int32, error) {
-	partitions, err := p.client.Partitions(p.topic)
-	if err != nil {
-		return -1, err
-	}
+// SendMessage sends a message with the given key and value to the given topic.
+// The partition to send to is selected by the Producer's Partitioner. To send
+// strings as either key or value, see the StringEncoder type.
+//
+// Unlike QueueMessage, SendMessage operates synchronously, and will block until
+// the response is received from the broker, returning any error generated in
+// the process. Reading from Errors() may interfere with the operation of
+// SendMessage().
+//
+// If you care about message ordering, you should not call QueueMessage and
+// SendMessage on the same Producer.
+func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
+	return p.genericSendMessage(topic, key, value, true)
+}
 
-	numPartitions := int32(len(partitions))
+func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
+	var keyBytes, valBytes []byte
 
-	choice := p.config.Partitioner.Partition(key, numPartitions)
+	if key != nil {
+		if keyBytes, err = key.Encode(); err != nil {
+			return err
+		}
+	}
+	if value != nil {
+		if valBytes, err = value.Encode(); err != nil {
+			return err
+		}
+	}
 
-	if choice < 0 || choice >= numPartitions {
-		return -1, InvalidPartition
+	partition, err := p.choosePartition(topic, key)
+	if err != nil {
+		return err
 	}
 
-	return partitions[choice], nil
+	// produce_message.go
+	msg := &produceMessage{
+		tp:       topicPartition{topic, partition},
+		key:      keyBytes,
+		value:    valBytes,
+		failures: 0,
+		sync:     synchronous,
+	}
+
+	// produce_message.go
+	return msg.enqueue(p)
 }
 
-func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
-	partition, err := p.choosePartition(key)
+func (p *Producer) addMessage(msg *produceMessage) error {
+	bp, err := p.brokerProducerFor(msg.tp)
 	if err != nil {
 		return err
 	}
+	bp.addMessage(msg, p.config.MaxBufferedBytes)
+	return nil
+}
 
-	var keyBytes []byte
-	var valBytes []byte
+func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
+	broker, err := p.client.Leader(tp.topic, tp.partition)
+	if err != nil {
+		return nil, err
+	}
 
-	if key != nil {
-		keyBytes, err = key.Encode()
-		if err != nil {
-			return err
+	p.m.RLock()
+	bp, ok := p.brokerProducers[broker]
+	p.m.RUnlock()
+	if !ok {
+		p.m.Lock()
+		bp, ok = p.brokerProducers[broker]
+		if !ok {
+			bp = p.newBrokerProducer(broker)
+			p.brokerProducers[broker] = bp
 		}
+		p.m.Unlock()
 	}
-	valBytes, err = value.Encode()
-	if err != nil {
-		return err
+
+	return bp, nil
+}
+
+func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
+	bp := &brokerProducer{
+		messages:    make(map[topicPartition][]*produceMessage),
+		flushNow:    make(chan bool, 1),
+		broker:      broker,
+		stopper:     make(chan bool),
+		done:        make(chan bool),
+		hasMessages: make(chan bool, 1),
 	}
 
-	broker, err := p.client.Leader(p.topic, partition)
-	if err != nil {
-		return err
+	maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		timer := time.NewTimer(maxBufferTime)
+		wg.Done()
+		for {
+			select {
+			case <-bp.flushNow:
+				bp.flush(p)
+			case <-timer.C:
+				bp.flushIfAnyMessages(p)
+			case <-bp.stopper:
+				delete(p.brokerProducers, bp.broker)
+				bp.flushIfAnyMessages(p)
+				p.client.disconnectBroker(bp.broker)
+				close(bp.flushNow)
+				close(bp.hasMessages)
+				close(bp.done)
+				return
+			}
+			timer.Reset(maxBufferTime)
+		}
+	}()
+	wg.Wait() // don't return until the G has started
+
+	return bp
+}
+
+func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
+	bp.mapM.Lock()
+	if msg.failures > 0 {
+		// Prepend: Deliver first, before any more recently-added messages.
+		bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
+	} else {
+		// Append
+		bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
 	}
+	bp.bufferedBytes += msg.byteSize()
 
-	if p.config.Compression != CompressionNone {
-		set := new(MessageSet)
-		set.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
-		valBytes, err = encode(set)
-		if err != nil {
-			return err
+	select {
+	case bp.hasMessages <- true:
+	default:
+	}
+
+	bp.mapM.Unlock()
+	bp.flushIfOverCapacity(maxBufferBytes)
+}
+
+func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
+	if bp.bufferedBytes > maxBufferBytes {
+		select {
+		case bp.flushNow <- true:
+		default:
+		}
+	}
+}
+
+func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
+	select {
+	case <-bp.hasMessages:
+		select {
+		case bp.hasMessages <- true:
+		default:
+		}
+		bp.flush(p)
+	default:
+	}
+}
+
+func (bp *brokerProducer) flush(p *Producer) {
+	var prb produceRequestBuilder
+
+	// only deliver messages for topic-partitions that are not currently being delivered.
+	bp.mapM.Lock()
+	for tp, messages := range bp.messages {
+		if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
+			defer p.releaseDeliveryLock(tp)
+			prb = append(prb, messages...)
+			delete(bp.messages, tp)
 		}
 	}
+	bp.mapM.Unlock()
 
-	request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
-	request.AddMessage(p.topic, partition, &Message{Codec: p.config.Compression, Key: keyBytes, Value: valBytes})
+	if len(prb) > 0 {
+		bp.mapM.Lock()
+		bp.bufferedBytes -= prb.byteSize()
+		bp.mapM.Unlock()
+
+		bp.flushRequest(p, prb, func(err error) {
+			p.errors <- err
+		})
+	}
+}
+
+func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
+	// produce_message.go
+	req := prb.toRequest(&p.config)
+	response, err := bp.broker.Produce(p.client.id, req)
 
-	response, err := broker.Produce(p.client.id, request)
 	switch err {
 	case nil:
 		break
 	case EncodingError:
-		return err
+		// No sense in retrying; it'll just fail again. But what about all the other
+		// messages that weren't invalid? Really, this is a "shit's broke real good"
+		// scenario, so logging it and moving on is probably acceptable.
+		Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb))
+		errorCb(err)
+		return
 	default:
-		if !retry {
-			return err
+		bp.Close()
+
+		overlimit := 0
+		prb.reverseEach(func(msg *produceMessage) {
+			if err := msg.reenqueue(p); err != nil {
+				overlimit++
+			}
+		})
+		if overlimit > 0 {
+			Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
+				overlimit, p.config.MaxDeliveryRetries)
+			errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
 		}
-		p.client.disconnectBroker(broker)
-		return p.safeSendMessage(key, value, false)
+		return
 	}
 
+	// When does this ever actually happen, and why don't we explode when it does?
+	// This seems bad.
 	if response == nil {
-		return nil
+		errorCb(nil)
+		return
 	}
 
-	block := response.GetBlock(p.topic, partition)
-	if block == nil {
-		return IncompleteResponse
-	}
+	for topic, d := range response.Blocks {
+		for partition, block := range d {
+			if block == nil {
+				// IncompleteResponse. Here we just drop all the messages; we don't know whether
+				// they were successfully sent or not. Non-ideal, but how often does it happen?
+				Logger.Printf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state\n",
+					len(prb), topic, partition)
+			}
+			switch block.Err {
+			case NoError:
+				// All the messages for this topic-partition were delivered successfully!
+				// Unlock delivery for this topic-partition and discard the produceMessage objects.
+				errorCb(nil)
+			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+				p.client.RefreshTopicMetadata(topic)
 
-	switch block.Err {
-	case NoError:
-		return nil
-	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-		if !retry {
-			return block.Err
+				overlimit := 0
+				prb.reverseEach(func(msg *produceMessage) {
+					if msg.hasTopicPartition(topic, partition) {
+						if err := msg.reenqueue(p); err != nil {
+							overlimit++
+						}
+					}
+				})
+				if overlimit > 0 {
+					Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
+						overlimit, p.config.MaxDeliveryRetries)
+				}
+			default:
+				Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",
+					len(prb), topic, partition)
+			}
 		}
-		err = p.client.RefreshTopicMetadata(p.topic)
-		if err != nil {
-			return err
+	}
+}
+
+func (bp *brokerProducer) Close() error {
+	select {
+	case <-bp.stopper:
+		return fmt.Errorf("already closed or closing")
+	default:
+		close(bp.stopper)
+		<-bp.done
+	}
+	return nil
+}
+
+func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
+	p.dm.RLock()
+	ch, ok := p.deliveryLocks[tp]
+	p.dm.RUnlock()
+	if !ok {
+		p.dm.Lock()
+		ch, ok = p.deliveryLocks[tp]
+		if !ok {
+			ch = make(chan bool, 1)
+			p.deliveryLocks[tp] = ch
 		}
-		return p.safeSendMessage(key, value, false)
+		p.dm.Unlock()
 	}
 
-	return block.Err
+	select {
+	case ch <- true:
+		return true
+	default:
+		return false
+	}
+}
+
+func (p *Producer) releaseDeliveryLock(tp topicPartition) {
+	p.dm.RLock()
+	ch := p.deliveryLocks[tp]
+	p.dm.RUnlock()
+	select {
+	case <-ch:
+	default:
+		panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
+	}
+}
+
+func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
+	partitions, err := p.client.Partitions(topic)
+	if err != nil {
+		return -1, err
+	}
+
+	numPartitions := int32(len(partitions))
+
+	choice := p.config.Partitioner.Partition(key, numPartitions)
+
+	if choice < 0 || choice >= numPartitions {
+		return -1, InvalidPartition
+	}
+
+	return partitions[choice], nil
 }

+ 303 - 44
producer_test.go

@@ -1,67 +1,307 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
 	"testing"
+	"time"
 )
 
+const TestMessage = "ABC THE MESSAGE"
+
 func TestSimpleProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-			binary.BigEndian.PutUint64(msg[23:], uint64(i))
-			extraResponses <- msg
-		}
-	}()
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	mb2.Returns(pr)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer client.Close()
 
-	producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
+	})
+	defer producer.Close()
+
+	// flush only on 10th and final message
+	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "my_topic", TestMessage, f)
+	}
+}
+
+func TestSimpleSyncProducer(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 1, 2)
+	mb1.Returns(mdr)
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 1, NoError)
+
+	for i := 0; i < 10; i++ {
+		mb2.Returns(pr)
+	}
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
+	})
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
-		err = producer.SendMessage(nil, StringEncoder("ABC THE MESSAGE"))
+		sendSyncMessage(t, producer, "my_topic", TestMessage)
+	}
+}
+
+func TestMultipleFlushes(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	defer mb1.Close()
+	defer mb2.Close()
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	pr.AddTopicPartition("my_topic", 0, NoError)
+	mb2.Returns(pr)
+	mb2.Returns(pr) // yes, twice.
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 5th message.
+		MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
+	})
+	defer producer.Close()
+
+	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "my_topic", TestMessage, f)
+	}
+}
+
+func TestMultipleProducer(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	mb3 := NewMockBroker(t, 3)
+	defer mb1.Close()
+	defer mb2.Close()
+	defer mb3.Close()
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr.AddTopicPartition("topic_a", 0, 2)
+	mdr.AddTopicPartition("topic_b", 0, 3)
+	mdr.AddTopicPartition("topic_c", 0, 3)
+	mb1.Returns(mdr)
+
+	pr1 := new(ProduceResponse)
+	pr1.AddTopicPartition("topic_a", 0, NoError)
+	mb2.Returns(pr1)
+
+	pr2 := new(ProduceResponse)
+	pr2.AddTopicPartition("topic_b", 0, NoError)
+	pr2.AddTopicPartition("topic_c", 0, NoError)
+	mb3.Returns(pr2)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
+	})
+	defer producer.Close()
+
+	// flush only on 10th and final message
+	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_a", TestMessage, f)
+	}
+
+	// no flushes
+	returns = []int{0, 0, 0, 0, 0}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_b", TestMessage, f)
+	}
+
+	// flush both topic_b and topic_c on 5th (ie. 10th for this broker)
+	returns = []int{0, 0, 0, 0, 2}
+	for _, f := range returns {
+		sendMessage(t, producer, "topic_c", TestMessage, f)
+	}
+}
+
+// Here we test that when two messages are sent in the same buffered request,
+// and more messages are enqueued while the request is pending, everything
+// happens correctly; that is, the first messages are retried before the next
+// batch is allowed to submit.
+func TestFailureRetry(t *testing.T) {
+	t.Skip("not yet working after mockbroker refactor")
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	mb3 := NewMockBroker(t, 3)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
+	mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr.AddTopicPartition("topic_a", 0, 2)
+	mdr.AddTopicPartition("topic_b", 0, 3)
+	mdr.AddTopicPartition("topic_c", 0, 3)
+	mb1.Returns(mdr)
+
+	/* mb1.ExpectMetadataRequest(). */
+	/* 	AddBroker(mb2). */
+	/* 	AddBroker(mb3). */
+	/* 	AddTopicPartition("topic_a", 0, 2). */
+	/* 	AddTopicPartition("topic_b", 0, 2). */
+	/* 	AddTopicPartition("topic_c", 0, 3) */
+
+	pr := new(ProduceResponse)
+	pr.AddTopicPartition("topic_a", 0, NoError)
+	pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
+	mb2.Returns(pr)
+
+	/* mb2.ExpectProduceRequest(). */
+	/* 	AddTopicPartition("topic_a", 0, 1, NoError). */
+	/* 	AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition) */
+
+	// The fact that mb2 is chosen here is not well-defined. In theory,
+	// it's a random choice between mb1, mb2, and mb3. Go's hash iteration
+	// isn't quite as random as claimed, though, it seems. Maybe because
+	// the same random seed is used each time?
+	mdr2 := new(MetadataResponse)
+	mdr2.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
+	mdr2.AddTopicPartition("topic_b", 0, 3)
+	mb2.Returns(mdr2)
+
+	/* mb2.ExpectMetadataRequest(). */
+	/* 	AddBroker(mb3). */
+	/* 	AddTopicPartition("topic_b", 0, 3) */
+
+	pr2 := new(ProduceResponse)
+	pr2.AddTopicPartition("topic_c", 0, NoError)
+	pr2.AddTopicPartition("topic_b", 0, NoError)
+	mb3.Returns(pr2)
+
+	/* mb3.ExpectProduceRequest(). */
+	/* 	AddTopicPartition("topic_c", 0, 1, NoError). */
+	/* 	AddTopicPartition("topic_b", 0, 1, NoError) */
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush after the 2nd message.
+		MaxBufferedBytes:   uint32((len(TestMessage) * 2) - 1),
+		MaxDeliveryRetries: 1,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	// Sent to mb3; does not flush because it's only half the cap.
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [__]
+	sendMessage(t, producer, "topic_c", TestMessage, 0)
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [X_]
+
+	// Sent to mb2; does not flush because it's only half the cap.
+	sendMessage(t, producer, "topic_a", TestMessage, 0)
+	// mb1: [__]
+	// mb2: [X_]
+	// mb3: [X_]
+
+	// Sent to mb2; flushes, errors (retriable).
+	// Three messages will be received:
+	//   * NoError for topic_a;
+	//   * NoError for topic_b;
+	//   * NoError for topic_c.
+	sendMessage(t, producer, "topic_b", TestMessage, 2)
+	// mb1: [__]
+	// mb2: [XX] <- flush!
+	// mb3: [X_]
+
+	// The topic_b message errors, and we should wind up here:
+
+	// mb1: [__]
+	// mb2: [__]
+	// mb3: [XX] <- topic_b reassigned to mb3 by metadata refresh, flushes.
+
+	defer mb1.Close()
+	defer mb2.Close()
+}
+
+func readMessage(t *testing.T, ch chan error) {
+	select {
+	case err := <-ch:
 		if err != nil {
 			t.Error(err)
 		}
+	case <-time.After(1 * time.Second):
+		t.Error(fmt.Errorf("Message was never received"))
+	}
+}
+
+func assertNoMessages(t *testing.T, ch chan error) {
+	select {
+	case x := <-ch:
+		t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
+	case <-time.After(1 * time.Millisecond):
 	}
 }
 
@@ -74,16 +314,35 @@ func ExampleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
+	producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal})
 	if err != nil {
 		panic(err)
 	}
 	defer producer.Close()
 
-	err = producer.SendMessage(nil, StringEncoder("testing 123"))
+	err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
 	if err != nil {
 		panic(err)
 	} else {
 		fmt.Println("> message sent")
 	}
 }
+
+func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
+	err := producer.QueueMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	for i := 0; i < expectedResponses; i++ {
+		readMessage(t, producer.Errors())
+	}
+	assertNoMessages(t, producer.Errors())
+}
+
+func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
+	err := producer.SendMessage(topic, nil, StringEncoder(key))
+	if err != nil {
+		t.Error(err)
+	}
+	assertNoMessages(t, producer.Errors())
+}

+ 14 - 0
real_encoder.go

@@ -37,6 +37,12 @@ func (re *realEncoder) putArrayLength(in int) error {
 
 // collection
 
+func (re *realEncoder) putRawBytes(in []byte) error {
+	copy(re.raw[re.off:], in)
+	re.off += len(in)
+	return nil
+}
+
 func (re *realEncoder) putBytes(in []byte) error {
 	if in == nil {
 		re.putInt32(-1)
@@ -63,6 +69,14 @@ func (re *realEncoder) putInt32Array(in []int32) error {
 	return nil
 }
 
+func (re *realEncoder) putInt64Array(in []int64) error {
+	re.putArrayLength(len(in))
+	for _, val := range in {
+		re.putInt64(val)
+	}
+	return nil
+}
+
 // stacks
 
 func (re *realEncoder) push(in pushEncoder) {