Ver código fonte

Rename MultiProducer to Producer, removing previous Producer API

Burke Libbey 12 anos atrás
pai
commit
b0454c0871
4 arquivos alterados com 583 adições e 710 exclusões
  1. 0 424
      multiproducer.go
  2. 0 234
      multiproducer_test.go
  3. 399 36
      producer.go
  4. 184 16
      producer_test.go

+ 0 - 424
multiproducer.go

@@ -1,424 +0,0 @@
-package sarama
-
-import (
-	"sync"
-	"time"
-)
-
-type MultiProducerConfig struct {
-	Partitioner        Partitioner
-	RequiredAcks       RequiredAcks
-	Timeout            int32
-	Compression        CompressionCodec
-	MaxBufferBytes     uint32
-	MaxBufferTime      uint32
-	MaxDeliveryRetries uint32
-}
-
-type MultiProducer struct {
-	client          *Client
-	config          MultiProducerConfig
-	brokerProducers map[*Broker]*brokerProducer
-	m               sync.RWMutex
-	errors          chan error
-	deliveryLocks   map[topicPartition]chan bool
-	dm              sync.RWMutex
-}
-
-type brokerProducer struct {
-	mapM          sync.Mutex
-	messages      map[string]map[int32][]*produceMessage
-	bufferedBytes uint32
-	flushNow      chan bool
-	broker        *Broker
-	stopper       chan bool
-	hasMessages   chan bool
-}
-
-type produceMessage struct {
-	topic      string
-	partition  int32
-	key, value []byte
-	failures   uint32
-}
-
-type topicPartition struct {
-	topic     string
-	partition int32
-}
-
-func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProducer, error) {
-	if config == nil {
-		config = new(MultiProducerConfig)
-	}
-
-	if config.RequiredAcks < -1 {
-		return nil, ConfigurationError("Invalid RequiredAcks")
-	}
-
-	if config.Timeout < 0 {
-		return nil, ConfigurationError("Invalid Timeout")
-	}
-
-	if config.Partitioner == nil {
-		config.Partitioner = NewRandomPartitioner()
-	}
-
-	if config.MaxBufferBytes == 0 {
-		config.MaxBufferBytes = 1
-	}
-
-	return &MultiProducer{
-		client:          client,
-		config:          *config,
-		errors:          make(chan error, 16),
-		deliveryLocks:   make(map[topicPartition]chan bool),
-		brokerProducers: make(map[*Broker]*brokerProducer),
-	}, nil
-}
-
-func (p *MultiProducer) Errors() chan error {
-	if p.isSynchronous() {
-		panic("use of Errors() is not permitted in synchronous mode.")
-	} else {
-		return p.errors
-	}
-}
-
-func (p *MultiProducer) Close() error {
-	return nil
-}
-
-func (p *MultiProducer) SendMessage(topic string, key, value Encoder) (err error) {
-	var keyBytes, valBytes []byte
-
-	if key != nil {
-		if keyBytes, err = key.Encode(); err != nil {
-			return err
-		}
-	}
-	if value != nil {
-		if valBytes, err = value.Encode(); err != nil {
-			return err
-		}
-	}
-
-	partition, err := p.choosePartition(topic, key)
-	if err != nil {
-		return err
-	}
-
-	msg := &produceMessage{
-		topic:     topic,
-		partition: partition,
-		key:       keyBytes,
-		value:     valBytes,
-		failures:  0,
-	}
-
-	return p.addMessage(msg, false)
-}
-
-func (p *MultiProducer) choosePartition(topic string, key Encoder) (int32, error) {
-	partitions, err := p.client.Partitions(topic)
-	if err != nil {
-		return -1, err
-	}
-
-	numPartitions := int32(len(partitions))
-
-	choice := p.config.Partitioner.Partition(key, numPartitions)
-
-	if choice < 0 || choice >= numPartitions {
-		return -1, InvalidPartition
-	}
-
-	return partitions[choice], nil
-}
-
-func (p *MultiProducer) addMessage(msg *produceMessage, isRetry bool) error {
-	broker, err := p.client.Leader(msg.topic, msg.partition)
-	if err != nil {
-		return err
-	}
-
-	bp := p.brokerProducerFor(broker)
-	bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
-
-	if p.isSynchronous() {
-		return <-p.errors
-	}
-	return nil
-}
-
-func (p *MultiProducer) isSynchronous() bool {
-	return p.config.MaxBufferBytes < 2 && p.config.MaxBufferTime == 0
-}
-
-func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
-	p.m.RLock()
-	bp, ok := p.brokerProducers[broker]
-	p.m.RUnlock()
-	if !ok {
-		p.m.Lock()
-		bp, ok = p.brokerProducers[broker]
-		if !ok {
-			bp = p.newBrokerProducer(broker)
-			p.brokerProducers[broker] = bp
-		}
-		p.m.Unlock()
-	}
-	return bp
-}
-
-func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
-	bp := &brokerProducer{
-		messages:    make(map[string]map[int32][]*produceMessage),
-		flushNow:    make(chan bool, 1),
-		broker:      broker,
-		stopper:     make(chan bool),
-		hasMessages: make(chan bool, 1),
-	}
-
-	maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
-
-	var wg sync.WaitGroup
-	wg.Add(1)
-
-	go func() {
-		timer := time.NewTimer(maxBufferTime)
-		wg.Done()
-		for {
-			select {
-			case <-bp.flushNow:
-				bp.flush(p)
-			case <-timer.C:
-				bp.flush(p)
-			case <-bp.stopper:
-				p.m.Lock()
-				delete(p.brokerProducers, bp.broker)
-				p.m.Unlock()
-				bp.flush(p)
-				p.client.disconnectBroker(bp.broker)
-				close(bp.flushNow)
-				close(bp.hasMessages)
-				return
-			}
-			timer.Reset(maxBufferTime)
-		}
-	}()
-	wg.Wait()
-
-	return bp
-}
-
-func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32, isRetry bool) {
-	bp.mapM.Lock()
-	forTopic, ok := bp.messages[msg.topic]
-	if !ok {
-		forTopic = make(map[int32][]*produceMessage)
-		bp.messages[msg.topic] = forTopic
-	}
-	if isRetry {
-		// Prepend: Deliver first.
-		forTopic[msg.partition] = append([]*produceMessage{msg}, forTopic[msg.partition]...)
-	} else {
-		// Append
-		forTopic[msg.partition] = append(forTopic[msg.partition], msg)
-	}
-	bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
-
-	select {
-	case bp.hasMessages <- true:
-	default:
-	}
-
-	bp.mapM.Unlock()
-	if bp.bufferedBytes > maxBufferBytes {
-		// TODO: decrement this later on
-		bp.tryFlush()
-	}
-}
-
-func (bp *brokerProducer) tryFlush() {
-	select {
-	case bp.flushNow <- true:
-	default:
-	}
-}
-
-func (bp *brokerProducer) flush(p *MultiProducer) {
-	// try to acquire delivery locks for each topic-partition involved.
-
-	var messagesToSend []*produceMessage
-
-	<-bp.hasMessages // wait for a message if the BP currently has none.
-
-	bp.mapM.Lock()
-	for topic, m := range bp.messages {
-		for partition, messages := range m {
-			if p.tryAcquireDeliveryLock(topic, partition) {
-
-				messagesToSend = append(messagesToSend, messages...)
-				m[partition] = nil
-
-			}
-		}
-	}
-	bp.mapM.Unlock()
-
-	go bp.flushMessages(p, messagesToSend)
-}
-
-func (bp *brokerProducer) flushMessages(p *MultiProducer, messages []*produceMessage) {
-	if len(messages) == 0 {
-		return
-	}
-
-	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
-	for _, pmsg := range messages {
-		msg := &Message{Codec: p.config.Compression, Key: pmsg.key, Value: pmsg.value}
-		req.AddMessage(pmsg.topic, pmsg.partition, msg)
-	}
-
-	bp.flushRequest(p, req, messages)
-}
-
-func (bp *brokerProducer) Close() error {
-	close(bp.stopper)
-	return nil
-}
-
-func (bp *brokerProducer) flushRequest(p *MultiProducer, request *ProduceRequest, messages []*produceMessage) {
-	response, err := bp.broker.Produce(p.client.id, request)
-
-	switch err {
-	case nil:
-		break
-	case EncodingError:
-		// No sense in retrying; it'll just fail again. But what about all the other
-		// messages that weren't invalid? Really, this is a "shit's broke real good"
-		// scenario, so angrily logging it and moving on is probably acceptable.
-		p.errors <- err
-		goto releaseAllLocks
-	default:
-		// TODO: Now we have to sift through the messages and determine which should be retried.
-
-		p.client.disconnectBroker(bp.broker)
-		bp.Close()
-
-		// ie. for msg := range reverse(messages)
-		for i := len(messages) - 1; i >= 0; i-- {
-			msg := messages[i]
-			if msg.failures < p.config.MaxDeliveryRetries {
-				msg.failures++
-				// Passing isRetry=true causes the message to happen before other queued messages.
-				// This is also why we have to iterate backwards through the failed messages --
-				// to preserve ordering, we have to prepend the items starting from the last one.
-				p.addMessage(msg, true)
-			} else {
-				// log about message failing too many times?
-			}
-		}
-		goto releaseAllLocks
-	}
-
-	// When does this ever actually happen, and why don't we explode when it does?
-	// This seems bad.
-	if response == nil {
-		p.errors <- nil
-		goto releaseAllLocks
-	}
-
-	for topic, d := range response.Blocks {
-		for partition, block := range d {
-			if block == nil {
-				// IncompleteResponse. Here we just drop all the messages; we don't know whether
-				// they were successfully sent or not. Non-ideal, but how often does it happen?
-				// Log angrily.
-			}
-			switch block.Err {
-			case NoError:
-				// All the messages for this topic-partition were delivered successfully!
-				// Unlock delivery for this topic-partition and discard the produceMessage objects.
-				p.errors <- nil
-			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-				// TODO: should we refresh metadata for this topic?
-
-				// ie. for msg := range reverse(messages)
-				for i := len(messages) - 1; i >= 0; i-- {
-					msg := messages[i]
-					if msg.topic == topic && msg.partition == partition {
-						if msg.failures < p.config.MaxDeliveryRetries {
-							msg.failures++
-							// Passing isRetry=true causes the message to happen before other queued messages.
-							// This is also why we have to iterate backwards through the failed messages --
-							// to preserve ordering, we have to prepend the items starting from the last one.
-							p.addMessage(msg, true)
-						} else {
-							// dropping message; log angrily maybe.
-						}
-					}
-				}
-			default:
-				// non-retriable error. Drop the messages and log angrily.
-			}
-			p.releaseDeliveryLock(topic, partition)
-		}
-	}
-
-	return
-
-releaseAllLocks:
-	// This is slow, but only happens on rare error conditions.
-
-	tps := make(map[string]map[int32]bool)
-	for _, msg := range messages {
-		forTopic, ok := tps[msg.topic]
-		if !ok {
-			forTopic = make(map[int32]bool)
-			tps[msg.topic] = forTopic
-		}
-		forTopic[msg.partition] = true
-	}
-
-	for topic, d := range tps {
-		for partition := range d {
-			p.releaseDeliveryLock(topic, partition)
-		}
-	}
-}
-
-func (p *MultiProducer) tryAcquireDeliveryLock(topic string, partition int32) bool {
-	tp := topicPartition{topic, partition}
-	p.dm.RLock()
-	ch, ok := p.deliveryLocks[tp]
-	p.dm.RUnlock()
-	if !ok {
-		p.dm.Lock()
-		ch, ok = p.deliveryLocks[tp]
-		if !ok {
-			ch = make(chan bool, 1)
-			p.deliveryLocks[tp] = ch
-		}
-		p.dm.Unlock()
-	}
-
-	select {
-	case ch <- true:
-		return true
-	default:
-		return false
-	}
-}
-
-func (p *MultiProducer) releaseDeliveryLock(topic string, partition int32) {
-	p.dm.RLock()
-	ch := p.deliveryLocks[topicPartition{topic, partition}]
-	p.dm.RUnlock()
-	select {
-	case <-ch:
-	default:
-		panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
-	}
-}

+ 0 - 234
multiproducer_test.go

@@ -1,234 +0,0 @@
-package sarama
-
-import (
-	"encoding/binary"
-	"fmt"
-	"testing"
-	"time"
-)
-
-func TestSimpleMultiProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-			0x00, 0x00, 0x00, 0x01,
-			0x00, 0x00, 0x00, 0x00,
-			0x00, 0x00,
-			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		extraResponses <- msg
-	}()
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewMultiProducer(client, &MultiProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
-	})
-	defer producer.Close()
-
-	for i := 0; i < 10; i++ {
-		err = producer.SendMessage("my_topic", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
-	}
-
-	readMessage(t, producer.Errors())
-	assertNoMessages(t, producer.Errors())
-
-	// TODO: This doesn't really test that we ONLY flush once.
-	// For example, change the MaxBufferBytes to be much lower.
-}
-
-func TestMultipleMultiProducer(t *testing.T) {
-	responses := make(chan []byte, 1)
-	responsesA := make(chan []byte)
-	responsesB := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	mockBrokerA := NewMockBroker(t, responsesA)
-	mockBrokerB := NewMockBroker(t, responsesB)
-	defer mockBroker.Close()
-	defer mockBrokerA.Close()
-	defer mockBrokerB.Close()
-
-	// We're going to return:
-	// topic: topic_a; partition: 0; brokerID: 1
-	// topic: topic_b; partition: 0; brokerID: 2
-	// topic: topic_c; partition: 0; brokerID: 2
-
-	// Return the extra broker metadata so that the producer will send
-	// requests to mockBrokerA and mockBrokerB.
-	response := []byte{
-		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
-
-		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
-
-		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
-		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
-
-		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x01, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
-		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
-		0x00, 0x00, // error: 0 means no error
-		0x00, 0x00, 0x00, 0x00, // partition ID
-		0x00, 0x00, 0x00, 0x02, // broker ID of leader
-		0x00, 0x00, 0x00, 0x00, // replica set
-		0x00, 0x00, 0x00, 0x00, // ISR set
-
-	}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockBrokerA.Port()))
-	binary.BigEndian.PutUint32(response[38:], uint32(mockBrokerB.Port()))
-	responses <- response
-
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-		}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		responsesA <- msg
-	}()
-
-	go func() {
-		msg := []byte{
-			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-
-			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
-			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
-			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
-			0x00, 0x00, // 21:22 error: 0 means no error
-			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
-		}
-		binary.BigEndian.PutUint64(msg[23:], 0)
-		responsesB <- msg
-	}()
-
-	// TODO: Submit events to 3 different topics on 2 different brokers.
-	// Need to figure out how the request format works to return the broker
-	// info for those two new brokers, and how to return multiple blocks in
-	// a ProduceRespose
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewMultiProducer(client, &MultiProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
-	})
-	defer producer.Close()
-
-	for i := 0; i < 10; i++ {
-		err = producer.SendMessage("topic_a", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
-	}
-
-	for i := 0; i < 5; i++ {
-		err = producer.SendMessage("topic_b", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
-	}
-
-	for i := 0; i < 5; i++ {
-		err = producer.SendMessage("topic_c", nil, StringEncoder("ABC THE MESSAGE"))
-		if err != nil {
-			t.Error(err)
-		}
-	}
-
-	// read three messages for topics A, B, and C. Assert they are nil.
-	readMessage(t, producer.Errors())
-	readMessage(t, producer.Errors())
-	readMessage(t, producer.Errors())
-
-	assertNoMessages(t, producer.Errors())
-}
-
-func readMessage(t *testing.T, ch chan error) {
-	select {
-	case err := <-ch:
-		if err != nil {
-			t.Error(err)
-		}
-	case <-time.After(1 * time.Second):
-		t.Error(fmt.Errorf("Message was never received"))
-	}
-}
-
-func assertNoMessages(t *testing.T, ch chan error) {
-	select {
-	case <-ch:
-		t.Error(fmt.Errorf("too many values returned"))
-	case <-time.After(5 * time.Millisecond):
-	}
-}

+ 399 - 36
producer.go

@@ -1,61 +1,424 @@
 package sarama
 
-// ProducerConfig is used to pass multiple configuration options to NewProducer.
+import (
+	"sync"
+	"time"
+)
+
 type ProducerConfig struct {
-	Partitioner  Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout      int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
-	Compression  CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	Partitioner        Partitioner
+	RequiredAcks       RequiredAcks
+	Timeout            int32
+	Compression        CompressionCodec
+	MaxBufferBytes     uint32
+	MaxBufferTime      uint32
+	MaxDeliveryRetries uint32
 }
 
-// Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
-// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
-// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type Producer struct {
-	mp    MultiProducer
-	topic string
+	client          *Client
+	config          ProducerConfig
+	brokerProducers map[*Broker]*brokerProducer
+	m               sync.RWMutex
+	errors          chan error
+	deliveryLocks   map[topicPartition]chan bool
+	dm              sync.RWMutex
+}
+
+type brokerProducer struct {
+	mapM          sync.Mutex
+	messages      map[string]map[int32][]*produceMessage
+	bufferedBytes uint32
+	flushNow      chan bool
+	broker        *Broker
+	stopper       chan bool
+	hasMessages   chan bool
+}
+
+type produceMessage struct {
+	topic      string
+	partition  int32
+	key, value []byte
+	failures   uint32
+}
+
+type topicPartition struct {
+	topic     string
+	partition int32
 }
 
-// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
-func NewProducer(client *Client, topic string, config *ProducerConfig) (*Producer, error) {
+func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	if config == nil {
 		config = new(ProducerConfig)
 	}
 
-	mpc := MultiProducerConfig{
-		Partitioner:    config.Partitioner,
-		RequiredAcks:   config.RequiredAcks,
-		Timeout:        config.Timeout,
-		Compression:    config.Compression,
-		MaxBufferBytes: 0, // synchronous
-		MaxBufferTime:  0, // synchronous
+	if config.RequiredAcks < -1 {
+		return nil, ConfigurationError("Invalid RequiredAcks")
 	}
-	mp, err := NewMultiProducer(client, &mpc)
-	if err != nil {
-		return nil, err
+
+	if config.Timeout < 0 {
+		return nil, ConfigurationError("Invalid Timeout")
 	}
 
-	if topic == "" {
-		return nil, ConfigurationError("Empty topic")
+	if config.Partitioner == nil {
+		config.Partitioner = NewRandomPartitioner()
 	}
 
-	p := &Producer{
-		topic: topic,
-		mp:    *mp,
+	if config.MaxBufferBytes == 0 {
+		config.MaxBufferBytes = 1
 	}
 
-	return p, nil
+	return &Producer{
+		client:          client,
+		config:          *config,
+		errors:          make(chan error, 16),
+		deliveryLocks:   make(map[topicPartition]chan bool),
+		brokerProducers: make(map[*Broker]*brokerProducer),
+	}, nil
+}
+
+func (p *Producer) Errors() chan error {
+	if p.isSynchronous() {
+		panic("use of Errors() is not permitted in synchronous mode.")
+	} else {
+		return p.errors
+	}
 }
 
-// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
-// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
-// on the underlying client.
 func (p *Producer) Close() error {
-	return p.mp.Close()
+	return nil
+}
+
+func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
+	var keyBytes, valBytes []byte
+
+	if key != nil {
+		if keyBytes, err = key.Encode(); err != nil {
+			return err
+		}
+	}
+	if value != nil {
+		if valBytes, err = value.Encode(); err != nil {
+			return err
+		}
+	}
+
+	partition, err := p.choosePartition(topic, key)
+	if err != nil {
+		return err
+	}
+
+	msg := &produceMessage{
+		topic:     topic,
+		partition: partition,
+		key:       keyBytes,
+		value:     valBytes,
+		failures:  0,
+	}
+
+	return p.addMessage(msg, false)
+}
+
+func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
+	partitions, err := p.client.Partitions(topic)
+	if err != nil {
+		return -1, err
+	}
+
+	numPartitions := int32(len(partitions))
+
+	choice := p.config.Partitioner.Partition(key, numPartitions)
+
+	if choice < 0 || choice >= numPartitions {
+		return -1, InvalidPartition
+	}
+
+	return partitions[choice], nil
+}
+
+func (p *Producer) addMessage(msg *produceMessage, isRetry bool) error {
+	broker, err := p.client.Leader(msg.topic, msg.partition)
+	if err != nil {
+		return err
+	}
+
+	bp := p.brokerProducerFor(broker)
+	bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
+
+	if p.isSynchronous() {
+		return <-p.errors
+	}
+	return nil
+}
+
+func (p *Producer) isSynchronous() bool {
+	return p.config.MaxBufferBytes < 2 && p.config.MaxBufferTime == 0
+}
+
+func (p *Producer) brokerProducerFor(broker *Broker) *brokerProducer {
+	p.m.RLock()
+	bp, ok := p.brokerProducers[broker]
+	p.m.RUnlock()
+	if !ok {
+		p.m.Lock()
+		bp, ok = p.brokerProducers[broker]
+		if !ok {
+			bp = p.newBrokerProducer(broker)
+			p.brokerProducers[broker] = bp
+		}
+		p.m.Unlock()
+	}
+	return bp
+}
+
+func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
+	bp := &brokerProducer{
+		messages:    make(map[string]map[int32][]*produceMessage),
+		flushNow:    make(chan bool, 1),
+		broker:      broker,
+		stopper:     make(chan bool),
+		hasMessages: make(chan bool, 1),
+	}
+
+	maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		timer := time.NewTimer(maxBufferTime)
+		wg.Done()
+		for {
+			select {
+			case <-bp.flushNow:
+				bp.flush(p)
+			case <-timer.C:
+				bp.flush(p)
+			case <-bp.stopper:
+				p.m.Lock()
+				delete(p.brokerProducers, bp.broker)
+				p.m.Unlock()
+				bp.flush(p)
+				p.client.disconnectBroker(bp.broker)
+				close(bp.flushNow)
+				close(bp.hasMessages)
+				return
+			}
+			timer.Reset(maxBufferTime)
+		}
+	}()
+	wg.Wait()
+
+	return bp
 }
 
-// SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
-// To send strings as either key or value, see the StringEncoder type.
-func (p *Producer) SendMessage(key, value Encoder) error {
-	return p.mp.SendMessage(p.topic, key, value)
+func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32, isRetry bool) {
+	bp.mapM.Lock()
+	forTopic, ok := bp.messages[msg.topic]
+	if !ok {
+		forTopic = make(map[int32][]*produceMessage)
+		bp.messages[msg.topic] = forTopic
+	}
+	if isRetry {
+		// Prepend: Deliver first.
+		forTopic[msg.partition] = append([]*produceMessage{msg}, forTopic[msg.partition]...)
+	} else {
+		// Append
+		forTopic[msg.partition] = append(forTopic[msg.partition], msg)
+	}
+	bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
+
+	select {
+	case bp.hasMessages <- true:
+	default:
+	}
+
+	bp.mapM.Unlock()
+	if bp.bufferedBytes > maxBufferBytes {
+		// TODO: decrement this later on
+		bp.tryFlush()
+	}
+}
+
+func (bp *brokerProducer) tryFlush() {
+	select {
+	case bp.flushNow <- true:
+	default:
+	}
+}
+
+func (bp *brokerProducer) flush(p *Producer) {
+	// try to acquire delivery locks for each topic-partition involved.
+
+	var messagesToSend []*produceMessage
+
+	<-bp.hasMessages // wait for a message if the BP currently has none.
+
+	bp.mapM.Lock()
+	for topic, m := range bp.messages {
+		for partition, messages := range m {
+			if p.tryAcquireDeliveryLock(topic, partition) {
+
+				messagesToSend = append(messagesToSend, messages...)
+				m[partition] = nil
+
+			}
+		}
+	}
+	bp.mapM.Unlock()
+
+	go bp.flushMessages(p, messagesToSend)
+}
+
+func (bp *brokerProducer) flushMessages(p *Producer, messages []*produceMessage) {
+	if len(messages) == 0 {
+		return
+	}
+
+	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
+	for _, pmsg := range messages {
+		msg := &Message{Codec: p.config.Compression, Key: pmsg.key, Value: pmsg.value}
+		req.AddMessage(pmsg.topic, pmsg.partition, msg)
+	}
+
+	bp.flushRequest(p, req, messages)
+}
+
+func (bp *brokerProducer) Close() error {
+	close(bp.stopper)
+	return nil
+}
+
+func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, messages []*produceMessage) {
+	response, err := bp.broker.Produce(p.client.id, request)
+
+	switch err {
+	case nil:
+		break
+	case EncodingError:
+		// No sense in retrying; it'll just fail again. But what about all the other
+		// messages that weren't invalid? Really, this is a "shit's broke real good"
+		// scenario, so angrily logging it and moving on is probably acceptable.
+		p.errors <- err
+		goto releaseAllLocks
+	default:
+		// TODO: Now we have to sift through the messages and determine which should be retried.
+
+		p.client.disconnectBroker(bp.broker)
+		bp.Close()
+
+		// ie. for msg := range reverse(messages)
+		for i := len(messages) - 1; i >= 0; i-- {
+			msg := messages[i]
+			if msg.failures < p.config.MaxDeliveryRetries {
+				msg.failures++
+				// Passing isRetry=true causes the message to happen before other queued messages.
+				// This is also why we have to iterate backwards through the failed messages --
+				// to preserve ordering, we have to prepend the items starting from the last one.
+				p.addMessage(msg, true)
+			} else {
+				// log about message failing too many times?
+			}
+		}
+		goto releaseAllLocks
+	}
+
+	// When does this ever actually happen, and why don't we explode when it does?
+	// This seems bad.
+	if response == nil {
+		p.errors <- nil
+		goto releaseAllLocks
+	}
+
+	for topic, d := range response.Blocks {
+		for partition, block := range d {
+			if block == nil {
+				// IncompleteResponse. Here we just drop all the messages; we don't know whether
+				// they were successfully sent or not. Non-ideal, but how often does it happen?
+				// Log angrily.
+			}
+			switch block.Err {
+			case NoError:
+				// All the messages for this topic-partition were delivered successfully!
+				// Unlock delivery for this topic-partition and discard the produceMessage objects.
+				p.errors <- nil
+			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+				// TODO: should we refresh metadata for this topic?
+
+				// ie. for msg := range reverse(messages)
+				for i := len(messages) - 1; i >= 0; i-- {
+					msg := messages[i]
+					if msg.topic == topic && msg.partition == partition {
+						if msg.failures < p.config.MaxDeliveryRetries {
+							msg.failures++
+							// Passing isRetry=true causes the message to happen before other queued messages.
+							// This is also why we have to iterate backwards through the failed messages --
+							// to preserve ordering, we have to prepend the items starting from the last one.
+							p.addMessage(msg, true)
+						} else {
+							// dropping message; log angrily maybe.
+						}
+					}
+				}
+			default:
+				// non-retriable error. Drop the messages and log angrily.
+			}
+			p.releaseDeliveryLock(topic, partition)
+		}
+	}
+
+	return
+
+releaseAllLocks:
+	// This is slow, but only happens on rare error conditions.
+
+	tps := make(map[string]map[int32]bool)
+	for _, msg := range messages {
+		forTopic, ok := tps[msg.topic]
+		if !ok {
+			forTopic = make(map[int32]bool)
+			tps[msg.topic] = forTopic
+		}
+		forTopic[msg.partition] = true
+	}
+
+	for topic, d := range tps {
+		for partition := range d {
+			p.releaseDeliveryLock(topic, partition)
+		}
+	}
+}
+
+func (p *Producer) tryAcquireDeliveryLock(topic string, partition int32) bool {
+	tp := topicPartition{topic, partition}
+	p.dm.RLock()
+	ch, ok := p.deliveryLocks[tp]
+	p.dm.RUnlock()
+	if !ok {
+		p.dm.Lock()
+		ch, ok = p.deliveryLocks[tp]
+		if !ok {
+			ch = make(chan bool, 1)
+			p.deliveryLocks[tp] = ch
+		}
+		p.dm.Unlock()
+	}
+
+	select {
+	case ch <- true:
+		return true
+	default:
+		return false
+	}
+}
+
+func (p *Producer) releaseDeliveryLock(topic string, partition int32) {
+	p.dm.RLock()
+	ch := p.deliveryLocks[topicPartition{topic, partition}]
+	p.dm.RUnlock()
+	select {
+	case <-ch:
+	default:
+		panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
+	}
 }

+ 184 - 16
producer_test.go

@@ -4,6 +4,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"testing"
+	"time"
 )
 
 func TestSimpleProducer(t *testing.T) {
@@ -32,37 +33,204 @@ func TestSimpleProducer(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 	go func() {
-		for i := 0; i < 10; i++ {
-			msg := []byte{
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-				0x00, 0x00, 0x00, 0x01,
-				0x00, 0x00, 0x00, 0x00,
-				0x00, 0x00,
-				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-			binary.BigEndian.PutUint64(msg[23:], uint64(i))
-			extraResponses <- msg
-		}
+		msg := []byte{
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x00, 0x00, 0x00,
+			0x00, 0x00,
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+		binary.BigEndian.PutUint64(msg[23:], 0)
+		extraResponses <- msg
 	}()
 
 	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer client.Close()
 
-	producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+	})
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		err = producer.SendMessage("my_topic", nil, StringEncoder("ABC THE MESSAGE"))
+		if err != nil {
+			t.Error(err)
+		}
+	}
+
+	readMessage(t, producer.Errors())
+	assertNoMessages(t, producer.Errors())
+
+	// TODO: This doesn't really test that we ONLY flush once.
+	// For example, change the MaxBufferBytes to be much lower.
+}
+
+func TestMultipleProducer(t *testing.T) {
+	responses := make(chan []byte, 1)
+	responsesA := make(chan []byte)
+	responsesB := make(chan []byte)
+	mockBroker := NewMockBroker(t, responses)
+	mockBrokerA := NewMockBroker(t, responsesA)
+	mockBrokerB := NewMockBroker(t, responsesB)
+	defer mockBroker.Close()
+	defer mockBrokerA.Close()
+	defer mockBrokerB.Close()
+
+	// We're going to return:
+	// topic: topic_a; partition: 0; brokerID: 1
+	// topic: topic_b; partition: 0; brokerID: 2
+	// topic: topic_c; partition: 0; brokerID: 2
+
+	// Return the extra broker metadata so that the producer will send
+	// requests to mockBrokerA and mockBrokerB.
+	response := []byte{
+		0x00, 0x00, 0x00, 0x02, // 0:3 number of brokers
+
+		0x00, 0x00, 0x00, 0x01, // 4:7 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 8:18 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 19:22 port will be written here.
+
+		0x00, 0x00, 0x00, 0x02, // 23:26 broker ID
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // 27:37 hostname
+		0xFF, 0xFF, 0xFF, 0xFF, // 38:41 port will be written here.
+
+		0x00, 0x00, 0x00, 0x03, // number of topic metadata records
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x01, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of partition metadata records for this topic
+		0x00, 0x00, // error: 0 means no error
+		0x00, 0x00, 0x00, 0x00, // partition ID
+		0x00, 0x00, 0x00, 0x02, // broker ID of leader
+		0x00, 0x00, 0x00, 0x00, // replica set
+		0x00, 0x00, 0x00, 0x00, // ISR set
+
+	}
+	binary.BigEndian.PutUint32(response[19:], uint32(mockBrokerA.Port()))
+	binary.BigEndian.PutUint32(response[38:], uint32(mockBrokerB.Port()))
+	responses <- response
+
+	go func() {
+		msg := []byte{
+			0x00, 0x00, 0x00, 0x01, // 0:3 number of topics
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'a', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: 0 means no error
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+		}
+		binary.BigEndian.PutUint64(msg[23:], 0)
+		responsesA <- msg
+	}()
+
+	go func() {
+		msg := []byte{
+			0x00, 0x00, 0x00, 0x02, // 0:3 number of topics
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'b', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: 0 means no error
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+
+			0x00, 0x07, 't', 'o', 'p', 'i', 'c', '_', 'c', // 4:12 topic name
+			0x00, 0x00, 0x00, 0x01, // 13:16 number of blocks for this topic
+			0x00, 0x00, 0x00, 0x00, // 17:20 partition id
+			0x00, 0x00, // 21:22 error: 0 means no error
+			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // 23:30 offset
+		}
+		binary.BigEndian.PutUint64(msg[23:], 0)
+		responsesB <- msg
+	}()
+
+	// TODO: Submit events to 3 different topics on 2 different brokers.
+	// Need to figure out how the request format works to return the broker
+	// info for those two new brokers, and how to return multiple blocks in
+	// a ProduceRespose
+
+	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
+
+	producer, err := NewProducer(client, &ProducerConfig{
+		RequiredAcks:  WaitForLocal,
+		MaxBufferTime: 1000000, // "never"
+		// So that we flush once, after the 10th message.
+		MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
+	})
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
-		err = producer.SendMessage(nil, StringEncoder("ABC THE MESSAGE"))
+		err = producer.SendMessage("topic_a", nil, StringEncoder("ABC THE MESSAGE"))
+		if err != nil {
+			t.Error(err)
+		}
+	}
+
+	for i := 0; i < 5; i++ {
+		err = producer.SendMessage("topic_b", nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
 			t.Error(err)
 		}
 	}
+
+	for i := 0; i < 5; i++ {
+		err = producer.SendMessage("topic_c", nil, StringEncoder("ABC THE MESSAGE"))
+		if err != nil {
+			t.Error(err)
+		}
+	}
+
+	// read three messages for topics A, B, and C. Assert they are nil.
+	readMessage(t, producer.Errors())
+	readMessage(t, producer.Errors())
+	readMessage(t, producer.Errors())
+
+	assertNoMessages(t, producer.Errors())
+}
+
+func readMessage(t *testing.T, ch chan error) {
+	select {
+	case err := <-ch:
+		if err != nil {
+			t.Error(err)
+		}
+	case <-time.After(1 * time.Second):
+		t.Error(fmt.Errorf("Message was never received"))
+	}
+}
+
+func assertNoMessages(t *testing.T, ch chan error) {
+	select {
+	case <-ch:
+		t.Error(fmt.Errorf("too many values returned"))
+	case <-time.After(5 * time.Millisecond):
+	}
 }
 
 func ExampleProducer() {
@@ -74,13 +242,13 @@ func ExampleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, "my_topic", &ProducerConfig{RequiredAcks: WaitForLocal})
+	producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal})
 	if err != nil {
 		panic(err)
 	}
 	defer producer.Close()
 
-	err = producer.SendMessage(nil, StringEncoder("testing 123"))
+	err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
 	if err != nil {
 		panic(err)
 	} else {