Browse Source

New Producer design

Evan Huus 11 năm trước cách đây
mục cha
commit
d5448f1b9a
8 tập tin đã thay đổi với 764 bổ sung787 xóa
  1. 3 0
      errors.go
  2. 53 6
      functional_test.go
  3. 0 105
      produce_message.go
  4. 12 0
      produce_request.go
  5. 578 390
      producer.go
  6. 60 285
      producer_test.go
  7. 47 0
      simple_producer.go
  8. 11 1
      utils.go

+ 3 - 0
errors.go

@@ -38,6 +38,9 @@ var EncodingError = errors.New("kafka: Error while encoding packet.")
 // of the message set.
 var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected.")
 
+// ShuttingDown is returned when a producer receives a message during shutdown.
+var ShuttingDown = errors.New("kafka: Message received by producer in process of shutting down.")
+
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
 type DecodingError struct {

+ 53 - 6
functional_test.go

@@ -42,7 +42,37 @@ func checkKafkaAvailability(t *testing.T) {
 	}
 }
 
-func TestProducingMessages(t *testing.T) {
+func TestFuncProducing(t *testing.T) {
+	config := NewProducerConfig()
+	testProducingMessages(t, config)
+}
+
+func TestFuncProducingGzip(t *testing.T) {
+	config := NewProducerConfig()
+	config.Compression = CompressionGZIP
+	testProducingMessages(t, config)
+}
+
+func TestFuncProducingSnappy(t *testing.T) {
+	config := NewProducerConfig()
+	config.Compression = CompressionSnappy
+	testProducingMessages(t, config)
+}
+
+func TestFuncProducingNoResponse(t *testing.T) {
+	config := NewProducerConfig()
+	config.RequiredAcks = NoResponse
+	testProducingMessages(t, config)
+}
+
+func TestFuncProducingFlushing(t *testing.T) {
+	config := NewProducerConfig()
+	config.FlushMsgCount = TestBatchSize / 8
+	config.FlushFrequency = 250 * time.Millisecond
+	testProducingMessages(t, config)
+}
+
+func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	checkKafkaAvailability(t)
 
 	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
@@ -60,15 +90,32 @@ func TestProducingMessages(t *testing.T) {
 	}
 	defer consumer.Close()
 
-	producer, err := NewProducer(client, nil)
+	config.AckSuccesses = true
+	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	for i := 1; i <= TestBatchSize; i++ {
-		err = producer.SendMessage("single_partition", nil, StringEncoder(fmt.Sprintf("testing %d", i)))
-		if err != nil {
-			t.Fatal(err)
+	expectedResponses := TestBatchSize
+	for i := 1; i <= TestBatchSize; {
+		msg := &MessageToSend{Topic: "single_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
+		select {
+		case producer.Input() <- msg:
+			i++
+		case ret := <-producer.Errors():
+			if ret.Err == nil {
+				expectedResponses--
+			} else {
+				t.Fatal(ret.Err)
+			}
+		}
+	}
+	for expectedResponses > 0 {
+		ret := <-producer.Errors()
+		if ret.Err == nil {
+			expectedResponses--
+		} else {
+			t.Fatal(ret.Err)
 		}
 	}
 	producer.Close()

+ 0 - 105
produce_message.go

@@ -1,105 +0,0 @@
-package sarama
-
-import (
-	"log"
-	"time"
-)
-
-type produceMessage struct {
-	tp         topicPartition
-	key, value []byte
-	retried    bool
-	sync       bool
-}
-
-type produceRequestBuilder []*produceMessage
-
-// If the message is synchronous, we manually send it and wait for a return.
-// Otherwise, we just hand it back to the producer to enqueue using the normal
-// method.
-func (msg *produceMessage) enqueue(p *Producer) error {
-	if !msg.sync {
-		return p.addMessage(msg)
-	}
-
-	var prb produceRequestBuilder = []*produceMessage{msg}
-	bp, err := p.brokerProducerFor(msg.tp)
-	if err != nil {
-		return err
-	}
-	errs := make(chan error, 1)
-	bp.flushRequest(p, prb, func(err error) {
-		errs <- err
-		close(errs)
-	})
-	return <-errs
-
-}
-
-func (msg *produceMessage) reenqueue(p *Producer) error {
-	if msg.retried {
-		return DroppedMessagesError{}
-	}
-	msg.retried = true
-	return msg.enqueue(p)
-}
-
-func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
-	return msg.tp.partition == partition && msg.tp.topic == topic
-}
-
-func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
-	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: int32(config.Timeout / time.Millisecond)}
-
-	// If compression is enabled, we need to group messages by topic-partition and
-	// wrap them in MessageSets. We already discarded that grouping, so we
-	// inefficiently re-sort them. This could be optimized (ie. pass a hash around
-	// rather than an array. Not sure what the best way is.
-	if config.Compression != CompressionNone {
-		msgSets := make(map[topicPartition]*MessageSet)
-		for _, pmsg := range b {
-			msgSet, ok := msgSets[pmsg.tp]
-			if !ok {
-				msgSet = new(MessageSet)
-				msgSets[pmsg.tp] = msgSet
-			}
-
-			msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
-		}
-		for tp, msgSet := range msgSets {
-			valBytes, err := encode(msgSet)
-			if err != nil {
-				log.Fatal(err) // if this happens, it's basically our fault.
-			}
-			msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
-			req.AddMessage(tp.topic, tp.partition, &msg)
-		}
-		return req
-	}
-
-	// Compression is not enabled. Dumb-ly append each request directly to the
-	// request, with no MessageSet wrapper.
-	for _, pmsg := range b {
-		msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
-		req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
-	}
-	return req
-}
-
-func (msg *produceMessage) byteSize() uint32 {
-	return uint32(len(msg.key) + len(msg.value))
-}
-
-func (b produceRequestBuilder) byteSize() uint32 {
-	var size uint32
-	for _, m := range b {
-		size += m.byteSize()
-	}
-	return size
-}
-
-func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
-	for i := len(b) - 1; i >= 0; i-- {
-		fn(b[i])
-	}
-}

+ 12 - 0
produce_request.go

@@ -77,3 +77,15 @@ func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
 
 	set.addMessage(msg)
 }
+
+func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
+	if p.msgSets == nil {
+		p.msgSets = make(map[string]map[int32]*MessageSet)
+	}
+
+	if p.msgSets[topic] == nil {
+		p.msgSets[topic] = make(map[int32]*MessageSet)
+	}
+
+	p.msgSets[topic][partition] = set
+}

+ 578 - 390
producer.go

@@ -6,72 +6,89 @@ import (
 	"time"
 )
 
+func forceFlushThreshold() int {
+	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
+}
+
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
-//
-// If MaxBufferTime=MaxBufferedBytes=1, messages will be delivered immediately and
-// constantly, but if multiple messages are received while a roundtrip to kafka
-// is in progress, they will both be combined into the next request. In this
-// mode, errors are not returned from SendMessage, but over the Errors()
-// channel.
 type ProducerConfig struct {
-	Partitioner  Partitioner  // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
-
-	// The maximum duration the broker will wait the receipt of the number of RequiredAcks.
-	// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1.
-	// Only supports millisecond resolution, nanoseconds will be truncated.
-	Timeout time.Duration
-
-	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	MaxBufferedBytes uint32           // The threshold number of bytes buffered before triggering a flush to the broker.
-	MaxBufferTime    time.Duration    // The maximum duration to buffer messages before triggering a flush to the broker.
-
-	// The maximum number of bytes allowed to accumulare in the buffer before back-pressure is applied to QueueMessage.
-	// Without this, queueing messages too fast will cause the producer to construct requests larger than the MaxRequestSize.
-	// Defaults to 50 MiB, cannot be more than (MaxRequestSize - 10 KiB).
-	BackPressureThresholdBytes uint32
+	Partitioner       Partitioner      // Chooses the partition to send messages to (defaults to random).
+	RequiredAcks      RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+	Timeout           time.Duration    // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
+	Compression       CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	FlushMsgCount     int              // The number of messages needed to trigger a flush.
+	FlushFrequency    time.Duration    // If this amount of time elapses without a flush, one will be queued.
+	FlushByteCount    int              // If this many bytes of messages are accumulated, a flush will be triggered.
+	AckSuccesses      bool             // If enabled, successfully delivered messages will also be returned on the Errors channel, with a nil Err field
+	MaxMessageBytes   int              // The maximum permitted size of a message (defaults to 1000000)
+	ChannelBufferSize int              // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
+}
+
+// NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
+func NewProducerConfig() *ProducerConfig {
+	return &ProducerConfig{
+		Partitioner:     NewRandomPartitioner(),
+		RequiredAcks:    WaitForLocal,
+		MaxMessageBytes: 1000000,
+	}
+}
+
+// Validate checks a ProducerConfig instance. It will return a
+// ConfigurationError if the specified value doesn't make sense.
+func (config *ProducerConfig) Validate() error {
+	if config.RequiredAcks < -1 {
+		return ConfigurationError("Invalid RequiredAcks")
+	}
+
+	if config.Timeout < 0 {
+		return ConfigurationError("Invalid Timeout")
+	} else if config.Timeout%time.Millisecond != 0 {
+		Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
+	}
+
+	if config.FlushMsgCount < 0 {
+		return ConfigurationError("Invalid FlushMsgCount")
+	}
+
+	if config.FlushByteCount < 0 {
+		return ConfigurationError("Invalid FlushByteCount")
+	} else if config.FlushByteCount >= forceFlushThreshold() {
+		Logger.Println("ProducerConfig.FlushByteCount too close to MaxRequestSize; it will be ignored.")
+	}
+
+	if config.FlushFrequency < 0 {
+		return ConfigurationError("Invalid FlushFrequency")
+	}
+
+	if config.Partitioner == nil {
+		return ConfigurationError("No partitioner set")
+	}
+
+	if config.MaxMessageBytes <= 0 {
+		return ConfigurationError("Invalid MaxMessageBytes")
+	} else if config.MaxMessageBytes >= forceFlushThreshold() {
+		Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
+	}
+
+	return nil
 }
 
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // for the provided topic-partition, refreshing metadata as appropriate, and
-// parses responses for errors. You must call Close() on a producer to avoid
-// leaks: it may not be garbage-collected automatically when it passes out of
+// parses responses for errors. You must read from the Errors() channel or the
+// producer will deadlock. You must call Close() on a producer to avoid
+// leaks: it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
-//
-// The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
-// deliver messages immediately, but to buffer subsequent messages while a
-// previous request is in-flight. This is often the correct behaviour.
-//
-// If synchronous operation is desired, you can use SendMessage. This will cause
-// sarama to block until the broker has returned a value. Normally, you will
-// want to use QueueMessage instead, and read the error back from the Errors()
-// channel. Note that when using QueueMessage, you *must* read the values from
-// the Errors() channel, or sarama will block indefinitely after a few requests.
 type Producer struct {
-	client          *Client
-	config          ProducerConfig
-	brokerProducers map[*Broker]*brokerProducer
-	m               sync.RWMutex
-	errors          chan error
-	deliveryLocks   map[topicPartition]chan bool
-	dm              sync.RWMutex
-}
+	client *Client
+	config ProducerConfig
 
-type brokerProducer struct {
-	mapM          sync.Mutex
-	messages      map[topicPartition][]*produceMessage
-	bufferedBytes uint32
-	flushNow      chan bool
-	broker        *Broker
-	stopper       chan bool
-	done          chan bool
-	hasMessages   chan bool
-}
+	errors         chan *ProduceError
+	input, retries chan *MessageToSend
 
-type topicPartition struct {
-	topic     string
-	partition int32
+	brokers    map[*Broker]*brokerWorker
+	brokerLock sync.Mutex
 }
 
 // NewProducer creates a new Producer using the given client.
@@ -90,445 +107,616 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		return nil, err
 	}
 
-	return &Producer{
-		client:          client,
-		config:          *config,
-		errors:          make(chan error, 16),
-		deliveryLocks:   make(map[topicPartition]chan bool),
-		brokerProducers: make(map[*Broker]*brokerProducer),
-	}, nil
+	p := &Producer{
+		client:  client,
+		config:  *config,
+		errors:  make(chan *ProduceError),
+		input:   make(chan *MessageToSend),
+		retries: make(chan *MessageToSend),
+		brokers: make(map[*Broker]*brokerWorker),
+	}
+
+	// launch our singleton dispatchers
+	go withRecover(p.topicDispatcher)
+	go withRecover(p.retryHandler)
+
+	return p, nil
+}
+
+type flagSet int8
+
+const (
+	retried  flagSet = 1 << iota // message has been retried
+	chaser                       // message is last in a group that failed
+	ref                          // add a reference to a singleton channel
+	unref                        // remove a reference from a singleton channel
+	shutdown                     // start the shutdown process
+)
+
+// MessageToSend is the collection of elements passed to the Producer in order to send a message.
+type MessageToSend struct {
+	Topic      string
+	Key, Value Encoder
+
+	// these are filled in by the producer as the message is processed
+	broker    *Broker
+	offset    int64
+	partition int32
+	flags     flagSet
+}
+
+// Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
+// the message was successfully delivered and RequiredAcks is not NoResponse.
+func (m *MessageToSend) Offset() int64 {
+	return m.offset
+}
+
+// Partition is the partition that the message was sent to. This is only guaranteed to be defined if
+// the message was successfully delivered.
+func (m *MessageToSend) Partition() int32 {
+	return m.partition
+}
+
+func (m *MessageToSend) byteSize() int {
+	size := 26 // the metadata overhead of CRC, flags, etc.
+	if m.Key != nil {
+		size += m.Key.Length()
+	}
+	if m.Value != nil {
+		size += m.Value.Length()
+	}
+	return size
+}
+
+// ProduceError is the type of error generated when the producer fails to deliver a message.
+// It contains the original MessageToSend as well as the actual error value. If the AckSuccesses configuration
+// value is set to true then every message sent generates a ProduceError, but successes will have a nil Err field.
+type ProduceError struct {
+	Msg *MessageToSend
+	Err error
+}
+
+// ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface.
+// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
+// when closing a producer.
+type ProduceErrors []*ProduceError
+
+func (pe ProduceErrors) Error() string {
+	return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
 }
 
-// Errors provides access to errors generated while parsing ProduceResponses from kafka
-// when operating in asynchronous mode. Should never be called in synchronous mode.
-func (p *Producer) Errors() chan error {
+// Errors is the output channel back to the user. You MUST read from this channel or the Producer will deadlock.
+// It is suggested that you send messages and read errors together in a single select statement.
+func (p *Producer) Errors() <-chan *ProduceError {
 	return p.errors
 }
 
+// Input is the input channel for the user to write messages to that they wish to send.
+func (p *Producer) Input() chan<- *MessageToSend {
+	return p.input
+}
+
 // Close shuts down the producer and flushes any messages it may have buffered.
 // You must call this function before a producer object passes out of scope, as
 // it may otherwise leak memory. You must call this before calling Close on the
 // underlying client.
 func (p *Producer) Close() error {
-	p.m.RLock()
-	defer p.m.RUnlock()
-	for _, bp := range p.brokerProducers {
-		bp.Close()
+	go func() {
+		p.input <- &MessageToSend{flags: shutdown}
+		p.retries <- &MessageToSend{flags: shutdown}
+	}()
+
+	var errors ProduceErrors
+	for event := range p.errors {
+		errors = append(errors, event)
 	}
-	return nil
+	return errors
 }
 
-// QueueMessage sends a message with the given key and value to the given topic.
-// The partition to send to is selected by the Producer's Partitioner. To send
-// strings as either key or value, see the StringEncoder type.
-//
-// QueueMessage uses buffering semantics to reduce the nubmer of requests to the
-// broker. The buffer logic is tunable with config.MaxBufferedBytes and
-// config.MaxBufferTime.
-//
-// QueueMessage will return an error if it's unable to construct the message
-// (unlikely), but network and response errors must be read from Errors(), since
-// QueueMessage uses asynchronous delivery. Note that you MUST read back from
-// Errors(), otherwise the producer will stall after some number of errors.
-//
-// If you care about message ordering, you should not call QueueMessage and
-// SendMessage on the same Producer. Either, used alone, preserves ordering,
-// however.
-func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
-	return p.genericSendMessage(topic, key, value, false)
-}
+///////////////////////////////////////////
+// In normal processing, a message flows through the following functions from top to bottom,
+// starting at topicDispatcher (which reads from Producer.input) and ending in flusher
+// (which sends the message to the broker). In cases where a message must be retried, it goes
+// through retryHandler before being returned to the top of the flow.
+///////////////////////////////////////////
+
+// singleton
+func (p *Producer) topicDispatcher() {
+	handlers := make(map[string]chan *MessageToSend)
+
+	for msg := range p.input {
+		if msg == nil {
+			Logger.Printf("somebody sent a nil message to the producer, it was ignored")
+			continue
+		}
 
-// SendMessage sends a message with the given key and value to the given topic.
-// The partition to send to is selected by the Producer's Partitioner. To send
-// strings as either key or value, see the StringEncoder type.
-//
-// Unlike QueueMessage, SendMessage operates synchronously, and will block until
-// the response is received from the broker, returning any error generated in
-// the process. Reading from Errors() may interfere with the operation of
-// SendMessage().
-//
-// If you care about message ordering, you should not call QueueMessage and
-// SendMessage on the same Producer.
-func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
-	return p.genericSendMessage(topic, key, value, true)
-}
+		if msg.flags&shutdown != 0 {
+			break
+		}
 
-func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
-	var keyBytes, valBytes []byte
+		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
+			(msg.byteSize() > p.config.MaxMessageBytes) {
 
-	if key != nil {
-		if keyBytes, err = key.Encode(); err != nil {
-			return err
+			p.errors <- &ProduceError{Msg: msg, Err: MessageSizeTooLarge}
+			continue
 		}
-	}
-	if value != nil {
-		if valBytes, err = value.Encode(); err != nil {
-			return err
+
+		handler := handlers[msg.Topic]
+		if handler == nil {
+			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
+			go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
+			handler = newHandler
+			handlers[msg.Topic] = handler
 		}
+
+		handler <- msg
 	}
 
-	partition, err := p.choosePartition(topic, key)
-	if err != nil {
-		return err
+	for _, handler := range handlers {
+		close(handler)
 	}
 
-	// produce_message.go
-	msg := &produceMessage{
-		tp:    topicPartition{topic, partition},
-		key:   keyBytes,
-		value: valBytes,
-		sync:  synchronous,
+	for msg := range p.input {
+		p.errors <- &ProduceError{Msg: msg, Err: ShuttingDown}
 	}
 
-	// produce_message.go
-	return msg.enqueue(p)
+	close(p.errors)
 }
 
-func (p *Producer) addMessage(msg *produceMessage) error {
-	bp, err := p.brokerProducerFor(msg.tp)
-	if err != nil {
-		return err
-	}
-	bp.addMessage(msg, p.config.MaxBufferedBytes, p.config.BackPressureThresholdBytes)
-	return nil
-}
+// one per topic
+func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
+	handlers := make(map[int32]chan *MessageToSend)
 
-func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
-	broker, err := p.client.Leader(tp.topic, tp.partition)
-	if err != nil {
-		return nil, err
-	}
-
-	p.m.RLock()
-	bp, ok := p.brokerProducers[broker]
-	p.m.RUnlock()
-	if !ok {
-		p.m.Lock()
-		bp, ok = p.brokerProducers[broker]
-		if !ok {
-			bp = p.newBrokerProducer(broker)
-			p.brokerProducers[broker] = bp
+	for msg := range input {
+		if msg.flags&retried == 0 {
+			err := p.assignPartition(msg)
+			if err != nil {
+				p.errors <- &ProduceError{Msg: msg, Err: err}
+				continue
+			}
 		}
-		p.m.Unlock()
-	}
 
-	return bp, nil
-}
+		handler := handlers[msg.partition]
+		if handler == nil {
+			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
+			go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
+			handler = newHandler
+			handlers[msg.partition] = handler
+		}
 
-func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
-	bp := &brokerProducer{
-		messages:    make(map[topicPartition][]*produceMessage),
-		flushNow:    make(chan bool, 1),
-		broker:      broker,
-		stopper:     make(chan bool),
-		done:        make(chan bool),
-		hasMessages: make(chan bool, 1),
+		handler <- msg
 	}
 
-	var wg sync.WaitGroup
-	wg.Add(1)
+	for _, handler := range handlers {
+		close(handler)
+	}
+}
 
-	go func() {
-		timer := time.NewTimer(p.config.MaxBufferTime)
-		var shutdownRequired bool
-		wg.Done()
-		for {
-			select {
-			case <-bp.flushNow:
-				if shutdownRequired = bp.flush(p); shutdownRequired {
-					goto shutdown
+// one per partition per topic
+func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
+	var leader *Broker
+	var output chan *MessageToSend
+	var backlog []*MessageToSend
+
+	for msg := range input {
+		if msg.flags&retried == 0 {
+			// normal case
+			if backlog != nil {
+				backlog = append(backlog, msg)
+				continue
+			}
+		} else if msg.flags&chaser == 0 {
+			// retry flag set, chaser flag not set
+			if backlog == nil {
+				// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
+				// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
+				output <- &MessageToSend{Topic: topic, partition: partition, broker: leader, flags: chaser}
+				backlog = make([]*MessageToSend, 0)
+				p.unrefBrokerWorker(leader)
+				output = nil
+			}
+		} else {
+			// retry *and* chaser flag set, flush the backlog and return to normal processing
+			if output == nil {
+				err := p.client.RefreshTopicMetadata(topic)
+				if err != nil {
+					p.returnMessages(backlog, err)
+					backlog = nil
+					continue
 				}
-			case <-timer.C:
-				if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
-					goto shutdown
+
+				leader, err = p.client.Leader(topic, partition)
+				if err != nil {
+					p.returnMessages(backlog, err)
+					backlog = nil
+					continue
 				}
-			case <-bp.stopper:
-				goto shutdown
+
+				output = p.getBrokerWorker(leader)
 			}
-			timer.Reset(p.config.MaxBufferTime)
-		}
-	shutdown:
-		delete(p.brokerProducers, bp.broker)
-		bp.flushIfAnyMessages(p)
-		if shutdownRequired {
-			p.client.disconnectBroker(bp.broker)
+
+			for _, msg := range backlog {
+				msg.broker = leader
+				output <- msg
+			}
+
+			backlog = nil
+			continue
 		}
-		close(bp.flushNow)
-		close(bp.hasMessages)
-		close(bp.done)
-	}()
-	wg.Wait() // don't return until the G has started
 
-	return bp
-}
+		if output == nil {
+			var err error
+			if backlog != nil {
+				err = p.client.RefreshTopicMetadata(topic)
+				if err != nil {
+					p.errors <- &ProduceError{Msg: msg, Err: err}
+					continue
+				}
+			}
 
-func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes, backPressureThreshold uint32) {
-	bp.mapM.Lock()
-	if msg.retried {
-		// Prepend: Deliver first, before any more recently-added messages.
-		bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
-	} else {
-		// Append
-		bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
-	}
-	bp.bufferedBytes += msg.byteSize()
+			leader, err = p.client.Leader(topic, partition)
+			if err != nil {
+				p.errors <- &ProduceError{Msg: msg, Err: err}
+				continue
+			}
 
-	select {
-	case bp.hasMessages <- true:
-	default:
+			output = p.getBrokerWorker(leader)
+		}
+
+		msg.broker = leader
+		output <- msg
 	}
 
-	bp.mapM.Unlock()
-	bp.flushIfOverCapacity(maxBufferBytes, backPressureThreshold)
+	p.unrefBrokerWorker(leader)
 }
 
-func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes, backPressureThreshold uint32) {
-	bp.mapM.Lock()
-	softlimit := bp.bufferedBytes > maxBufferBytes
-	hardlimit := bp.bufferedBytes > backPressureThreshold
-	bp.mapM.Unlock()
+// one per broker
+func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
+	var ticker *time.Ticker
+	var timer <-chan time.Time
+	if p.config.FlushFrequency > 0 {
+		ticker = time.NewTicker(p.config.FlushFrequency)
+		timer = ticker.C
+	}
+
+	var buffer []*MessageToSend
+	var doFlush chan []*MessageToSend
+	var bytesAccumulated int
 
-	if hardlimit {
-		bp.flushNow <- true
-	} else if softlimit {
+	flusher := make(chan []*MessageToSend)
+	go withRecover(func() { p.flusher(broker, flusher) })
+
+	for {
 		select {
-		case bp.flushNow <- true:
-		default:
+		case msg := <-input:
+			if msg == nil {
+				goto shutdown
+			}
+
+			if bytesAccumulated+msg.byteSize() >= forceFlushThreshold() {
+				flusher <- buffer
+				buffer = nil
+				doFlush = nil
+				bytesAccumulated = 0
+			}
+
+			buffer = append(buffer, msg)
+			bytesAccumulated += msg.byteSize()
+
+			if len(buffer) >= p.config.FlushMsgCount ||
+				(p.config.FlushByteCount > 0 && bytesAccumulated >= p.config.FlushByteCount) {
+				doFlush = flusher
+			}
+		case <-timer:
+			doFlush = flusher
+		case doFlush <- buffer:
+			buffer = nil
+			doFlush = nil
+			bytesAccumulated = 0
 		}
 	}
-}
 
-func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) {
-	select {
-	case <-bp.hasMessages:
-		select {
-		case bp.hasMessages <- true:
-		default:
-		}
-		return bp.flush(p)
-	default:
+shutdown:
+	if ticker != nil {
+		ticker.Stop()
+	}
+	if len(buffer) > 0 {
+		flusher <- buffer
 	}
-	return false
+	close(flusher)
 }
 
-func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
-	var prb produceRequestBuilder
+// one per broker
+func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
+	var closing error
+	currentRetries := make(map[string]map[int32]error)
 
-	// only deliver messages for topic-partitions that are not currently being delivered.
-	bp.mapM.Lock()
-	for tp, messages := range bp.messages {
-		if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
-			prb = append(prb, messages...)
-			delete(bp.messages, tp)
-			p.releaseDeliveryLock(tp)
+	p.retries <- &MessageToSend{flags: ref}
+	for batch := range input {
+		if closing != nil {
+			p.retryMessages(batch, closing)
+			continue
 		}
-	}
-	bp.mapM.Unlock()
-
-	if len(prb) > 0 {
-		bp.mapM.Lock()
-		bp.bufferedBytes -= prb.byteSize()
-		bp.mapM.Unlock()
 
-		return bp.flushRequest(p, prb, func(err error) {
-			if err != nil {
-				Logger.Println(err)
+		// group messages by topic/partition
+		msgSets := make(map[string]map[int32][]*MessageToSend)
+		for i, msg := range batch {
+			if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
+				if msg.flags&chaser == chaser {
+					// we can start processing this topic/partition again
+					currentRetries[msg.Topic][msg.partition] = nil
+				}
+				p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
+				batch[i] = nil // to prevent it being returned/retried twice
+				continue
 			}
-			p.errors <- err
-		})
-	}
-	return false
-}
 
-func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
-	// produce_message.go
-	req := prb.toRequest(&p.config)
-	response, err := bp.broker.Produce(p.client.id, req)
-
-	switch err {
-	case nil:
-		break
-	case EncodingError:
-		// No sense in retrying; it'll just fail again. But what about all the other
-		// messages that weren't invalid? Really, this is a "shit's broke real good"
-		// scenario, so logging it and moving on is probably acceptable.
-		errorCb(err)
-		return false
-	default:
-		p.client.disconnectBroker(bp.broker)
-		overlimit := 0
-		prb.reverseEach(func(msg *produceMessage) {
-			if err := msg.reenqueue(p); err != nil {
-				overlimit++
+			partitionSet := msgSets[msg.Topic]
+			if partitionSet == nil {
+				partitionSet = make(map[int32][]*MessageToSend)
+				msgSets[msg.Topic] = partitionSet
 			}
-		})
-		if overlimit > 0 {
-			errorCb(DroppedMessagesError{overlimit, err})
-		} else {
-			errorCb(nil)
+
+			partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
 		}
-		return true
-	}
 
-	// When does this ever actually happen, and why don't we explode when it does?
-	// This seems bad.
-	if response == nil {
-		errorCb(nil)
-		return false
-	}
+		request := p.buildRequest(msgSets)
+		if request == nil {
+			continue
+		}
 
-	seenPartitions := false
-	for topic, d := range response.Blocks {
-		for partition, block := range d {
-			seenPartitions = true
+		response, err := broker.Produce(p.client.id, request)
+
+		switch err {
+		case nil:
+			break
+		case EncodingError:
+			p.returnMessages(batch, err)
+			continue
+		default:
+			p.client.disconnectBroker(broker)
+			closing = err
+			p.retryMessages(batch, err)
+			continue
+		}
 
-			if block == nil {
-				// IncompleteResponse. Here we just drop all the messages; we don't know whether
-				// they were successfully sent or not. Non-ideal, but how often does it happen?
-				errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
+		if response == nil {
+			// this only happens when RequiredAcks is NoResponse, so we have to assume success
+			if p.config.AckSuccesses {
+				p.returnMessages(batch, nil)
 			}
-			switch block.Err {
-			case NoError:
-				// All the messages for this topic-partition were delivered successfully!
-				// Unlock delivery for this topic-partition and discard the produceMessage objects.
-				errorCb(nil)
-			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-				p.client.RefreshTopicMetadata(topic)
-
-				overlimit := 0
-				prb.reverseEach(func(msg *produceMessage) {
-					if msg.hasTopicPartition(topic, partition) {
-						if err := msg.reenqueue(p); err != nil {
-							overlimit++
+			continue
+		}
+
+		// we iterate through the blocks in the request, not the response, so that we notice
+		// if the response is missing a block completely
+		for topic, partitionSet := range msgSets {
+			for partition, msgs := range partitionSet {
+
+				block := response.GetBlock(topic, partition)
+				if block == nil {
+					p.returnMessages(msgs, IncompleteResponse)
+					continue
+				}
+
+				switch block.Err {
+				case NoError:
+					// All the messages for this topic-partition were delivered successfully!
+					if p.config.AckSuccesses {
+						for i := range msgs {
+							msgs[i].offset = block.Offset + int64(i)
 						}
+						p.returnMessages(msgs, nil)
+					}
+				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+					if currentRetries[topic] == nil {
+						currentRetries[topic] = make(map[int32]error)
 					}
-				})
-				if overlimit > 0 {
-					errorCb(DroppedMessagesError{overlimit, err})
-				} else {
-					errorCb(nil)
+					currentRetries[topic][partition] = block.Err
+					p.retryMessages(msgs, block.Err)
+				default:
+					p.returnMessages(msgs, block.Err)
 				}
-			default:
-				errorCb(DroppedMessagesError{len(prb), err})
 			}
 		}
 	}
-
-	if !seenPartitions {
-		errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
-	}
-
-	return false
+	p.retries <- &MessageToSend{flags: unref}
 }
 
-func (bp *brokerProducer) Close() error {
-	select {
-	case <-bp.stopper:
-		return fmt.Errorf("already closed or closing")
-	default:
-		close(bp.stopper)
-		<-bp.done
-	}
-	return nil
-}
+// singleton
+func (p *Producer) retryHandler() {
+	var buf []*MessageToSend
+	var msg *MessageToSend
+	refs := 0
+	shuttingDown := false
 
-func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
-	p.dm.RLock()
-	ch, ok := p.deliveryLocks[tp]
-	p.dm.RUnlock()
-	if !ok {
-		p.dm.Lock()
-		ch, ok = p.deliveryLocks[tp]
-		if !ok {
-			ch = make(chan bool, 1)
-			p.deliveryLocks[tp] = ch
+	for {
+		if len(buf) == 0 {
+			msg = <-p.retries
+		} else {
+			select {
+			case msg = <-p.retries:
+			case p.input <- buf[0]:
+				buf = buf[1:]
+				continue
+			}
 		}
-		p.dm.Unlock()
-	}
 
-	select {
-	case ch <- true:
-		return true
-	default:
-		return false
+		if msg.flags&ref != 0 {
+			refs++
+		} else if msg.flags&unref != 0 {
+			refs--
+			if refs == 0 && shuttingDown {
+				break
+			}
+		} else if msg.flags&shutdown != 0 {
+			shuttingDown = true
+			if refs == 0 {
+				break
+			}
+		} else {
+			buf = append(buf, msg)
+		}
 	}
-}
 
-func (p *Producer) releaseDeliveryLock(tp topicPartition) {
-	p.dm.RLock()
-	ch := p.deliveryLocks[tp]
-	p.dm.RUnlock()
-	select {
-	case <-ch:
-	default:
-		panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
+	close(p.retries)
+	for i := range buf {
+		p.input <- buf[i]
 	}
+	close(p.input)
 }
 
-func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
-	partitions, err := p.client.Partitions(topic)
+///////////////////////////////////////////
+///////////////////////////////////////////
+
+// utility functions
+
+func (p *Producer) assignPartition(msg *MessageToSend) error {
+	partitions, err := p.client.Partitions(msg.Topic)
 	if err != nil {
-		return -1, err
+		return err
 	}
 
 	numPartitions := int32(len(partitions))
 
 	if numPartitions == 0 {
-		return -1, LeaderNotAvailable
+		return LeaderNotAvailable
 	}
 
-	choice := p.config.Partitioner.Partition(key, numPartitions)
+	choice := p.config.Partitioner.Partition(msg.Key, numPartitions)
 
 	if choice < 0 || choice >= numPartitions {
-		return -1, InvalidPartition
+		return InvalidPartition
 	}
 
-	return partitions[choice], nil
-}
+	msg.partition = partitions[choice]
 
-// NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
-func NewProducerConfig() *ProducerConfig {
-	return &ProducerConfig{
-		Partitioner:                NewRandomPartitioner(),
-		RequiredAcks:               WaitForLocal,
-		MaxBufferTime:              1 * time.Millisecond,
-		MaxBufferedBytes:           1,
-		BackPressureThresholdBytes: 50 * 1024 * 1024,
-	}
+	return nil
 }
 
-// Validate checks a ProducerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ProducerConfig) Validate() error {
-	if config.RequiredAcks < -1 {
-		return ConfigurationError("Invalid RequiredAcks")
-	}
+func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *ProduceRequest {
+
+	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
+	empty := true
+
+	for topic, partitionSet := range batch {
+		for partition, msgSet := range partitionSet {
+			setToSend := new(MessageSet)
+			setSize := 0
+			for _, msg := range msgSet {
+				var keyBytes, valBytes []byte
+				var err error
+				if msg.Key != nil {
+					if keyBytes, err = msg.Key.Encode(); err != nil {
+						p.errors <- &ProduceError{Msg: msg, Err: err}
+						continue
+					}
+				}
+				if msg.Value != nil {
+					if valBytes, err = msg.Value.Encode(); err != nil {
+						p.errors <- &ProduceError{Msg: msg, Err: err}
+						continue
+					}
+				}
 
-	if config.Timeout < 0 {
-		return ConfigurationError("Invalid Timeout")
-	} else if config.Timeout%time.Millisecond != 0 {
-		Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
-	}
+				if p.config.Compression != CompressionNone && setSize+msg.byteSize() > p.config.MaxMessageBytes {
+					// compression causes message-sets to be wrapped as single messages, which have tighter
+					// size requirements, so we have to respect those limits
+					valBytes, err := encode(setToSend)
+					if err != nil {
+						Logger.Println(err) // if this happens, it's basically our fault.
+						panic(err)
+					}
+					req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+					setToSend = new(MessageSet)
+					setSize = 0
+				}
+				setSize += msg.byteSize()
 
-	if config.MaxBufferedBytes == 0 {
-		return ConfigurationError("Invalid MaxBufferedBytes")
+				setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
+				empty = false
+			}
+
+			if p.config.Compression == CompressionNone {
+				req.AddSet(topic, partition, setToSend)
+			} else {
+				valBytes, err := encode(setToSend)
+				if err != nil {
+					Logger.Println(err) // if this happens, it's basically our fault.
+					panic(err)
+				}
+				req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+			}
+		}
 	}
 
-	if config.MaxBufferTime == 0 {
-		return ConfigurationError("Invalid MaxBufferTime")
+	if empty {
+		return nil
+	} else {
+		return req
 	}
+}
 
-	if config.Partitioner == nil {
-		return ConfigurationError("No partitioner set")
+func (p *Producer) returnMessages(batch []*MessageToSend, err error) {
+	for _, msg := range batch {
+		if msg == nil {
+			continue
+		}
+		p.errors <- &ProduceError{Msg: msg, Err: err}
 	}
+}
 
-	if config.BackPressureThresholdBytes < config.MaxBufferedBytes {
-		return ConfigurationError("BackPressureThresholdBytes cannot be less than MaxBufferedBytes")
+func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
+	Logger.Println("Producer requeueing batch of", len(batch), "messages due to error:", err)
+	for _, msg := range batch {
+		if msg == nil {
+			continue
+		}
+		if msg.flags&retried == retried {
+			p.errors <- &ProduceError{Msg: msg, Err: err}
+		} else {
+			msg.flags |= retried
+			p.retries <- msg
+		}
 	}
+	Logger.Println("Messages requeued")
+}
 
-	if config.BackPressureThresholdBytes > MaxRequestSize-10*1024 {
-		return ConfigurationError("BackPressureThresholdBytes must be at least 10KiB less than MaxRequestSize")
+type brokerWorker struct {
+	input chan *MessageToSend
+	refs  int
+}
+
+func (p *Producer) getBrokerWorker(broker *Broker) chan *MessageToSend {
+	p.brokerLock.Lock()
+	defer p.brokerLock.Unlock()
+
+	worker := p.brokers[broker]
+
+	if worker == nil {
+		worker = &brokerWorker{
+			refs:  1,
+			input: make(chan *MessageToSend),
+		}
+		p.brokers[broker] = worker
+		go withRecover(func() { p.messageAggregator(broker, worker.input) })
+	} else {
+		worker.refs++
 	}
 
-	return nil
+	return worker.input
+}
+
+func (p *Producer) unrefBrokerWorker(broker *Broker) {
+	p.brokerLock.Lock()
+	defer p.brokerLock.Unlock()
+
+	worker := p.brokers[broker]
+
+	if worker != nil {
+		worker.refs--
+		if worker.refs == 0 {
+			close(worker.input)
+		}
+	}
 }

+ 60 - 285
producer_test.go

@@ -3,18 +3,10 @@ package sarama
 import (
 	"fmt"
 	"testing"
-	"time"
 )
 
 const TestMessage = "ABC THE MESSAGE"
 
-func defaultProducerConfig() *ProducerConfig {
-	config := NewProducerConfig()
-	config.MaxBufferTime = 1000000 * time.Millisecond             // don't flush based on time
-	config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
-	return config
-}
-
 func TestDefaultProducerConfigValidates(t *testing.T) {
 	config := NewProducerConfig()
 	if err := config.Validate(); err != nil {
@@ -23,289 +15,95 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 }
 
 func TestSimpleProducer(t *testing.T) {
-
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	defer mb1.Close()
-	defer mb2.Close()
-
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
-	mb1.Returns(mdr)
-
-	pr := new(ProduceResponse)
-	pr.AddTopicPartition("my_topic", 0, NoError)
-	mb2.Returns(pr)
-
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewProducer(client, defaultProducerConfig())
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer producer.Close()
-
-	// flush only on 10th and final message
-	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
-	for _, f := range returns {
-		sendMessage(t, producer, "my_topic", TestMessage, f)
-	}
-}
-
-func TestSimpleSyncProducer(t *testing.T) {
-
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	defer mb1.Close()
-	defer mb2.Close()
-
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 1, 2)
-	mb1.Returns(mdr)
-
-	pr := new(ProduceResponse)
-	pr.AddTopicPartition("my_topic", 1, NoError)
-
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	defer broker1.Close()
+	defer broker2.Close()
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, 2)
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
 	for i := 0; i < 10; i++ {
-		mb2.Returns(pr)
+		broker2.Returns(response2)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, defaultProducerConfig())
+	producer, err := NewSimpleProducer(client, "my_topic", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
-		sendSyncMessage(t, producer, "my_topic", TestMessage)
+		err = producer.SendMessage(nil, StringEncoder(TestMessage))
+		if err != nil {
+			t.Error(err)
+		}
 	}
 }
 
-func TestMultipleFlushes(t *testing.T) {
-
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	defer mb1.Close()
-	defer mb2.Close()
+func TestProducer(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	defer broker1.Close()
+	defer broker2.Close()
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
-	mb1.Returns(mdr)
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, 2)
+	broker1.Returns(response1)
 
-	pr := new(ProduceResponse)
-	pr.AddTopicPartition("my_topic", 0, NoError)
-	pr.AddTopicPartition("my_topic", 0, NoError)
-	mb2.Returns(pr)
-	mb2.Returns(pr) // yes, twice.
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	config := defaultProducerConfig()
-	// So that we flush after the 2nd message.
-	config.MaxBufferedBytes = uint32((len(TestMessage) * 5) - 1)
+	config := NewProducerConfig()
+	config.FlushMsgCount = 10
+	config.AckSuccesses = true
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer producer.Close()
 
-	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
-	for _, f := range returns {
-		sendMessage(t, producer, "my_topic", TestMessage, f)
-	}
-}
-
-func TestMultipleProducer(t *testing.T) {
-
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	mb3 := NewMockBroker(t, 3)
-	defer mb1.Close()
-	defer mb2.Close()
-	defer mb3.Close()
-
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
-	mdr.AddTopicPartition("topic_a", 0, 2)
-	mdr.AddTopicPartition("topic_b", 0, 3)
-	mdr.AddTopicPartition("topic_c", 0, 3)
-	mb1.Returns(mdr)
-
-	pr1 := new(ProduceResponse)
-	pr1.AddTopicPartition("topic_a", 0, NoError)
-	mb2.Returns(pr1)
-
-	pr2 := new(ProduceResponse)
-	pr2.AddTopicPartition("topic_b", 0, NoError)
-	pr2.AddTopicPartition("topic_c", 0, NoError)
-	mb3.Returns(pr2)
-
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewProducer(client, defaultProducerConfig())
-	if err != nil {
-		t.Fatal(err)
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	defer producer.Close()
-
-	// flush only on 10th and final message
-	returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
-	for _, f := range returns {
-		sendMessage(t, producer, "topic_a", TestMessage, f)
+	for i := 0; i < 10; i++ {
+		msg := <-producer.Errors()
+		if msg.Err != nil {
+			t.Error(err)
+		}
 	}
+}
 
-	// no flushes
-	returns = []int{0, 0, 0, 0, 0}
-	for _, f := range returns {
-		sendMessage(t, producer, "topic_b", TestMessage, f)
-	}
+func TestProducerMultipleFlushes(t *testing.T) {
+	t.Skip("TODO")
+}
 
-	// flush both topic_b and topic_c on 5th (ie. 10th for this broker)
-	returns = []int{0, 0, 0, 0, 2}
-	for _, f := range returns {
-		sendMessage(t, producer, "topic_c", TestMessage, f)
-	}
+func TestProducerMultipleBrokers(t *testing.T) {
+	t.Skip("TODO")
 }
 
 // Here we test that when two messages are sent in the same buffered request,
 // and more messages are enqueued while the request is pending, everything
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
-func TestFailureRetry(t *testing.T) {
-	t.Skip("not yet working after mockbroker refactor")
-
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	mb3 := NewMockBroker(t, 3)
-
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
-	mdr.AddTopicPartition("topic_a", 0, 2)
-	mdr.AddTopicPartition("topic_b", 0, 3)
-	mdr.AddTopicPartition("topic_c", 0, 3)
-	mb1.Returns(mdr)
-
-	/* mb1.ExpectMetadataRequest(). */
-	/* 	AddBroker(mb2). */
-	/* 	AddBroker(mb3). */
-	/* 	AddTopicPartition("topic_a", 0, 2). */
-	/* 	AddTopicPartition("topic_b", 0, 2). */
-	/* 	AddTopicPartition("topic_c", 0, 3) */
-
-	pr := new(ProduceResponse)
-	pr.AddTopicPartition("topic_a", 0, NoError)
-	pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
-	mb2.Returns(pr)
-
-	/* mb2.ExpectProduceRequest(). */
-	/* 	AddTopicPartition("topic_a", 0, 1, NoError). */
-	/* 	AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition) */
-
-	// The fact that mb2 is chosen here is not well-defined. In theory,
-	// it's a random choice between mb1, mb2, and mb3. Go's hash iteration
-	// isn't quite as random as claimed, though, it seems. Maybe because
-	// the same random seed is used each time?
-	mdr2 := new(MetadataResponse)
-	mdr2.AddBroker(mb3.Addr(), mb3.BrokerID())
-	mdr2.AddTopicPartition("topic_b", 0, 3)
-	mb2.Returns(mdr2)
-
-	/* mb2.ExpectMetadataRequest(). */
-	/* 	AddBroker(mb3). */
-	/* 	AddTopicPartition("topic_b", 0, 3) */
-
-	pr2 := new(ProduceResponse)
-	pr2.AddTopicPartition("topic_c", 0, NoError)
-	pr2.AddTopicPartition("topic_b", 0, NoError)
-	mb3.Returns(pr2)
-
-	/* mb3.ExpectProduceRequest(). */
-	/* 	AddTopicPartition("topic_c", 0, 1, NoError). */
-	/* 	AddTopicPartition("topic_b", 0, 1, NoError) */
-
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer client.Close()
-
-	producer, err := NewProducer(client, defaultProducerConfig())
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer producer.Close()
-
-	// Sent to mb3; does not flush because it's only half the cap.
-	// mb1: [__]
-	// mb2: [__]
-	// mb3: [__]
-	sendMessage(t, producer, "topic_c", TestMessage, 0)
-	// mb1: [__]
-	// mb2: [__]
-	// mb3: [X_]
-
-	// Sent to mb2; does not flush because it's only half the cap.
-	sendMessage(t, producer, "topic_a", TestMessage, 0)
-	// mb1: [__]
-	// mb2: [X_]
-	// mb3: [X_]
-
-	// Sent to mb2; flushes, errors (retriable).
-	// Three messages will be received:
-	//   * NoError for topic_a;
-	//   * NoError for topic_b;
-	//   * NoError for topic_c.
-	sendMessage(t, producer, "topic_b", TestMessage, 2)
-	// mb1: [__]
-	// mb2: [XX] <- flush!
-	// mb3: [X_]
-
-	// The topic_b message errors, and we should wind up here:
-
-	// mb1: [__]
-	// mb2: [__]
-	// mb3: [XX] <- topic_b reassigned to mb3 by metadata refresh, flushes.
-
-	defer mb1.Close()
-	defer mb2.Close()
-}
-
-func readMessage(t *testing.T, ch chan error) {
-	select {
-	case err := <-ch:
-		if err != nil {
-			t.Error(err)
-		}
-	case <-time.After(1 * time.Second):
-		t.Error(fmt.Errorf("Message was never received"))
-	}
-}
-
-func assertNoMessages(t *testing.T, ch chan error) {
-	select {
-	case x := <-ch:
-		t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
-	case <-time.After(1 * time.Millisecond):
-	}
+func TestProducerFailureRetry(t *testing.T) {
+	t.Skip("TODO")
 }
 
 func ExampleProducer() {
@@ -323,15 +121,17 @@ func ExampleProducer() {
 	}
 	defer producer.Close()
 
-	err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> message sent")
+	for {
+		select {
+		case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
+			fmt.Println("> message queued")
+		case err := <-producer.Errors():
+			panic(err)
+		}
 	}
 }
 
-func ExampleAsyncProducer() {
+func ExampleSimpleProducer() {
 	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
 	if err != nil {
 		panic(err)
@@ -340,20 +140,14 @@ func ExampleAsyncProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, nil)
+	producer, err := NewSimpleProducer(client, "my_topic", nil)
 	if err != nil {
 		panic(err)
 	}
 	defer producer.Close()
 
-	go func() {
-		for err := range producer.Errors() {
-			panic(err)
-		}
-	}()
-
 	for {
-		err = producer.QueueMessage("my_topic", nil, StringEncoder("testing 123"))
+		err = producer.SendMessage(nil, StringEncoder("testing 123"))
 		if err != nil {
 			panic(err)
 		} else {
@@ -361,22 +155,3 @@ func ExampleAsyncProducer() {
 		}
 	}
 }
-
-func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
-	err := producer.QueueMessage(topic, nil, StringEncoder(key))
-	if err != nil {
-		t.Error(err)
-	}
-	for i := 0; i < expectedResponses; i++ {
-		readMessage(t, producer.Errors())
-	}
-	assertNoMessages(t, producer.Errors())
-}
-
-func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
-	err := producer.SendMessage(topic, nil, StringEncoder(key))
-	if err != nil {
-		t.Error(err)
-	}
-	assertNoMessages(t, producer.Errors())
-}

+ 47 - 0
simple_producer.go

@@ -0,0 +1,47 @@
+package sarama
+
+// SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
+// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
+// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
+type SimpleProducer struct {
+	producer *Producer
+	topic    string
+}
+
+// NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
+// partitioner is nil, messages are partitioned randomly.
+func NewSimpleProducer(client *Client, topic string, partitioner Partitioner) (*SimpleProducer, error) {
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
+	config := NewProducerConfig()
+	config.AckSuccesses = true
+	if partitioner != nil {
+		config.Partitioner = partitioner
+	}
+
+	prod, err := NewProducer(client, config)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &SimpleProducer{prod, topic}, nil
+}
+
+// SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
+func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
+	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+
+	result := <-sp.producer.Errors() // we always get something because AckSuccesses is true
+
+	return result.Err
+}
+
+// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
+// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
+// on the underlying client.
+func (sp *SimpleProducer) Close() error {
+	return sp.producer.Close()
+}

+ 11 - 1
utils.go

@@ -28,9 +28,11 @@ func withRecover(fn func()) {
 }
 
 // Encoder is a simple interface for any type that can be encoded as an array of bytes
-// in order to be sent as the key or value of a Kafka message.
+// in order to be sent as the key or value of a Kafka message. Length() is provided as an
+// optimization, and must return the same as len() on the result of Encode().
 type Encoder interface {
 	Encode() ([]byte, error)
+	Length() int
 }
 
 // make strings and byte slices encodable for convenience so they can be used as keys
@@ -44,6 +46,10 @@ func (s StringEncoder) Encode() ([]byte, error) {
 	return []byte(s), nil
 }
 
+func (s StringEncoder) Length() int {
+	return len(s)
+}
+
 // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
 //	producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
 type ByteEncoder []byte
@@ -51,3 +57,7 @@ type ByteEncoder []byte
 func (b ByteEncoder) Encode() ([]byte, error) {
 	return b, nil
 }
+
+func (b ByteEncoder) Length() int {
+	return len(b)
+}