Burke Libbey 12 years ago
parent
commit
4731510d8d
3 changed files with 210 additions and 189 deletions
  1. 50 0
      produce_message.go
  2. 158 189
      producer.go
  3. 2 0
      producer_test.go

+ 50 - 0
produce_message.go

@@ -0,0 +1,50 @@
+package sarama
+
+type produceMessage struct {
+	tp         topicPartition
+	key, value []byte
+	failures   uint32
+}
+
+type produceRequestBuilder []*produceMessage
+
+func (msg *produceMessage) reenqueue(p *Producer) (ok bool) {
+	if msg.failures < p.config.MaxDeliveryRetries {
+		msg.failures++
+		p.addMessage(msg)
+		return true
+	} else {
+		return false
+	}
+}
+
+func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
+	return msg.tp.partition == partition && msg.tp.topic == topic
+}
+
+func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
+	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
+	for _, pmsg := range b {
+		msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
+		req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)
+	}
+	return req
+}
+
+func (msg *produceMessage) byteSize() uint32 {
+	return uint32(len(msg.key) + len(msg.value))
+}
+
+func (b produceRequestBuilder) byteSize() uint32 {
+	var size uint32
+	for _, m := range b {
+		size += m.byteSize()
+	}
+	return size
+}
+
+func (b produceRequestBuilder) reverseEach(fn func(m *produceMessage)) {
+	for i := len(b) - 1; i >= 0; i-- {
+		fn(b[i])
+	}
+}

+ 158 - 189
producer.go

@@ -1,29 +1,26 @@
 package sarama
 
 import (
+	"fmt"
 	"sync"
 	"time"
 )
 
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
 //
-// If Synchronous=true, messages are not buffered, and are delivered one at a
-// time, with errors being returned from SendMessage.
+// If MaxBufferTime=MaxBufferBytes=0, messages will be delivered immediately and
+// constantly, but if multiple messages are received while a roundtrip to kafka
+// is in progress, they will both be combined into the next request. In this
+// mode, errors are not returned from SendMessage, but over the Errors()
+// channel.
 //
-// If Synchronous=false and MaxBufferTime=MaxBufferBytes=0, messages will be
-// immediately and constantly, but if multiple messages are received while a
-// roundtrip to kafka is in progress, they will both be combined into the next
-// request. In this mode, errors are not returned from SendMessage, but over the
-// Errors() channel.
-//
-// With Synchronous=false and MaxBufferTime and/or MaxBufferBytes set to values
-// > 0, sarama will buffer messages before sending, to reduce traffic.
+// With MaxBufferTime and/or MaxBufferBytes set to values > 0, sarama will
+// buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
 	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
 	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
 	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
 	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	Synchronous        bool             // If true, errors are returned from SendMessage and no buffering happens. MaxBuffer* parameters must not be set.
 	MaxBufferBytes     uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
 	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
 	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
@@ -55,7 +52,7 @@ type Producer struct {
 
 type brokerProducer struct {
 	mapM          sync.Mutex
-	messages      map[string]map[int32][]*produceMessage
+	messages      map[topicPartition][]*produceMessage
 	bufferedBytes uint32
 	flushNow      chan bool
 	broker        *Broker
@@ -64,13 +61,6 @@ type brokerProducer struct {
 	hasMessages   chan bool
 }
 
-type produceMessage struct {
-	topic      string
-	partition  int32
-	key, value []byte
-	failures   uint32
-}
-
 type topicPartition struct {
 	topic     string
 	partition int32
@@ -98,15 +88,6 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		config.Partitioner = NewRandomPartitioner()
 	}
 
-	if config.Synchronous {
-		if config.MaxBufferBytes != 0 {
-			return nil, ConfigurationError("Synchronous and MaxBufferBytes conflict")
-		}
-		if config.MaxBufferTime != 0 {
-			return nil, ConfigurationError("Synchronous and MaxBufferTime conflict")
-		}
-	}
-
 	if config.MaxBufferBytes == 0 {
 		config.MaxBufferBytes = 1
 	}
@@ -124,11 +105,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 // while parsing ProduceResponses from kafka. Should never be called in
 // synchronous mode.
 func (p *Producer) Errors() chan error {
-	if p.config.Synchronous {
-		panic("use of Errors() is not permitted in synchronous mode.")
-	} else {
-		return p.errors
-	}
+	return p.errors
 }
 
 // Close shuts down the producer and flushes any messages it may have buffered.
@@ -144,15 +121,22 @@ func (p *Producer) Close() error {
 	return nil
 }
 
-// SendMessage sends a message with the given key and value to the given topic.
+// QueueMessage sends a message with the given key and value to the given topic.
 // The partition to send to is selected by the Producer's Partitioner. To send
 // strings as either key or value, see the StringEncoder type.
 //
-// If operating in synchronous mode, a nil return indicates everything happened
-// successfully. In asynchronous mode, a nil return only means that the data was
-// successfully sent to kafka, and you must listen to the channel returned by
-// Errors() for any errors generated later when the response is received.
-func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
+// QueueMessage uses buffering semantics to reduce the nubmer of requests to the
+// broker. The buffer logic is tunable with config.MaxBufferBytes and
+// config.MaxBufferTime.
+//
+// QueueMessage will return an error if it's unable to construct the message
+// (unlikely), but network and response errors must be read from Errors(), since
+// QueueMessage uses asynchronous delivery. Note that you MUST read back from
+// Errors(), otherwise the producer will stall after some number of errors.
+//
+// If you care about message ordering, you should not call QueueMessage and
+// SendMessage on the same Producer.
+func (p *Producer) QueueMessage(topic string, key, value Encoder) (err error) {
 	var keyBytes, valBytes []byte
 
 	if key != nil {
@@ -172,49 +156,69 @@ func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
 	}
 
 	msg := &produceMessage{
-		topic:     topic,
-		partition: partition,
-		key:       keyBytes,
-		value:     valBytes,
-		failures:  0,
+		tp:       topicPartition{topic, partition},
+		key:      keyBytes,
+		value:    valBytes,
+		failures: 0,
 	}
 
-	return p.addMessage(msg, false)
+	return p.addMessage(msg)
 }
 
-func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
-	partitions, err := p.client.Partitions(topic)
-	if err != nil {
-		return -1, err
-	}
+// SendMessage sends a message with the given key and value to the given topic.
+// The partition to send to is selected by the Producer's Partitioner. To send
+// strings as either key or value, see the StringEncoder type.
+//
+// Unlike QueueMessage, SendMessage operates synchronously, and will block until
+// the response is received from the broker, returning any error generated in
+// the process. Reading from Errors() may interfere with the operation of
+// SendMessage().
+//
+// If you care about message ordering, you should not call QueueMessage and
+// SendMessage on the same Producer.
+func (p *Producer) SendMessage(topic string, key, value Encoder) error {
+	var keyBytes, valBytes []byte
 
-	numPartitions := int32(len(partitions))
+	if key != nil {
+		if keyBytes, err = key.Encode(); err != nil {
+			return err
+		}
+	}
+	if value != nil {
+		if valBytes, err = value.Encode(); err != nil {
+			return err
+		}
+	}
 
-	choice := p.config.Partitioner.Partition(key, numPartitions)
+	partition, err := p.choosePartition(topic, key)
+	if err != nil {
+		return err
+	}
 
-	if choice < 0 || choice >= numPartitions {
-		return -1, InvalidPartition
+	msg := &produceMessage{
+		tp:       topicPartition{topic, partition},
+		key:      keyBytes,
+		value:    valBytes,
+		failures: 0,
 	}
 
-	return partitions[choice], nil
 }
 
-func (p *Producer) addMessage(msg *produceMessage, isRetry bool) error {
-	broker, err := p.client.Leader(msg.topic, msg.partition)
+func (p *Producer) addMessage(msg *produceMessage) error {
+	bp, err := p.brokerProducerFor(msg.tp)
 	if err != nil {
 		return err
 	}
-
-	bp := p.brokerProducerFor(broker)
-	bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
-
-	if p.config.Synchronous {
-		return <-p.errors
-	}
+	bp.addMessage(msg, p.config.MaxBufferBytes)
 	return nil
 }
 
-func (p *Producer) brokerProducerFor(broker *Broker) *brokerProducer {
+func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
+	broker, err := p.client.Leader(tp.topic, tp.partition)
+	if err != nil {
+		return nil, err
+	}
+
 	p.m.RLock()
 	bp, ok := p.brokerProducers[broker]
 	p.m.RUnlock()
@@ -227,12 +231,12 @@ func (p *Producer) brokerProducerFor(broker *Broker) *brokerProducer {
 		}
 		p.m.Unlock()
 	}
-	return bp
+	return bp, nil
 }
 
 func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 	bp := &brokerProducer{
-		messages:    make(map[string]map[int32][]*produceMessage),
+		messages:    make(map[topicPartition][]*produceMessage),
 		flushNow:    make(chan bool, 1),
 		broker:      broker,
 		stopper:     make(chan bool),
@@ -249,19 +253,15 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		timer := time.NewTimer(maxBufferTime)
 		wg.Done()
 		for {
+			println("SEL")
 			select {
 			case <-bp.flushNow:
 				bp.flush(p)
 			case <-timer.C:
-				bp.flush(p)
+				bp.flushIfAnyMessages(p)
 			case <-bp.stopper:
 				delete(p.brokerProducers, bp.broker)
-				select {
-				case <-bp.hasMessages:
-					bp.hasMessages <- true
-					bp.flush(p)
-				default:
-				}
+				bp.flushIfAnyMessages(p)
 				p.client.disconnectBroker(bp.broker)
 				close(bp.flushNow)
 				close(bp.hasMessages)
@@ -271,26 +271,21 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 			timer.Reset(maxBufferTime)
 		}
 	}()
-	wg.Wait()
+	wg.Wait() // don't return until the G has started
 
 	return bp
 }
 
-func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32, isRetry bool) {
+func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
 	bp.mapM.Lock()
-	forTopic, ok := bp.messages[msg.topic]
-	if !ok {
-		forTopic = make(map[int32][]*produceMessage)
-		bp.messages[msg.topic] = forTopic
-	}
-	if isRetry {
-		// Prepend: Deliver first.
-		forTopic[msg.partition] = append([]*produceMessage{msg}, forTopic[msg.partition]...)
+	if msg.failures > 0 {
+		// Prepend: Deliver first, before any more recently-added messages.
+		bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
 	} else {
 		// Append
-		forTopic[msg.partition] = append(forTopic[msg.partition], msg)
+		bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
 	}
-	bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
+	bp.bufferedBytes += msg.byteSize()
 
 	select {
 	case bp.hasMessages <- true:
@@ -298,71 +293,62 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32,
 	}
 
 	bp.mapM.Unlock()
+	bp.flushIfOverCapacity(maxBufferBytes)
+}
+
+func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
 	if bp.bufferedBytes > maxBufferBytes {
-		bp.tryFlush()
+		select {
+		case bp.flushNow <- true:
+		default:
+		}
 	}
 }
 
-func (bp *brokerProducer) tryFlush() {
+func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
 	select {
-	case bp.flushNow <- true:
+	case <-bp.hasMessages:
+		bp.hasMessages <- true
+		bp.flush(p)
 	default:
 	}
 }
 
 func (bp *brokerProducer) flush(p *Producer) {
-	// try to acquire delivery locks for each topic-partition involved.
-
-	var messagesToSend []*produceMessage
-
-	<-bp.hasMessages // wait for a message if the BP currently has none.
+	var prb produceRequestBuilder
+	fmt.Println("FLUSHING")
 
+	// only deliver messages for topic-partitions that are not currently being delivered.
 	bp.mapM.Lock()
-	for topic, m := range bp.messages {
-		for partition, messages := range m {
-			if p.tryAcquireDeliveryLock(topic, partition) {
-
-				messagesToSend = append(messagesToSend, messages...)
-				m[partition] = nil
-
-			}
+	for tp, messages := range bp.messages {
+		if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
+			defer p.releaseDeliveryLock(tp)
+			prb = append(prb, messages...)
+			delete(bp.messages, tp)
 		}
 	}
 	bp.mapM.Unlock()
 
-	bp.flushMessages(p, messagesToSend)
-}
-
-func (bp *brokerProducer) flushMessages(p *Producer, messages []*produceMessage) {
-	if len(messages) == 0 {
-		return
-	}
+	if len(prb) > 0 {
+		bp.mapM.Lock()
+		bp.bufferedBytes -= prb.byteSize()
+		bp.mapM.Unlock()
 
-	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
-	for _, pmsg := range messages {
-		msg := &Message{Codec: p.config.Compression, Key: pmsg.key, Value: pmsg.value}
-		req.AddMessage(pmsg.topic, pmsg.partition, msg)
+		bp.flushRequest(p, prb, func(err error) {
+			p.errors <- err
+		})
 	}
-
-	bp.flushRequest(p, req, messages)
 }
 
-func (bp *brokerProducer) Close() error {
-	close(bp.stopper)
-	<-bp.done
-	return nil
+func (bp *brokerProducer) CloseAndDisconnect(client *Client) {
+	broker := bp.broker
+	bp.Close()
+	client.disconnectBroker(broker)
 }
 
-func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, messages []*produceMessage) {
-	response, err := bp.broker.Produce(p.client.id, request)
-
-	size := 0
-	for _, m := range messages {
-		size += len(m.key) + len(m.value)
-	}
-	bp.mapM.Lock()
-	bp.bufferedBytes -= uint32(size)
-	bp.mapM.Unlock()
+func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
+	req := prb.toRequest(&p.config)
+	response, err := bp.broker.Produce(p.client.id, req)
 
 	switch err {
 	case nil:
@@ -371,40 +357,31 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 		// No sense in retrying; it'll just fail again. But what about all the other
 		// messages that weren't invalid? Really, this is a "shit's broke real good"
 		// scenario, so logging it and moving on is probably acceptable.
-		Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(messages))
-		p.errors <- err
-		goto releaseAllLocks
+		Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb))
+		errorCb(err)
+		return
 	default:
-		p.client.disconnectBroker(bp.broker)
-		bp.Close()
+		bp.CloseAndDisconnect(p.client)
 
 		overlimit := 0
-		// ie. for msg := range reverse(messages)
-		for i := len(messages) - 1; i >= 0; i-- {
-			msg := messages[i]
-			if msg.failures < p.config.MaxDeliveryRetries {
-				msg.failures++
-				// Passing isRetry=true causes the message to happen before other queued messages.
-				// This is also why we have to iterate backwards through the failed messages --
-				// to preserve ordering, we have to prepend the items starting from the last one.
-				p.addMessage(msg, true)
-			} else {
+		prb.reverseEach(func(msg *produceMessage) {
+			if ok := msg.reenqueue(p); !ok {
 				overlimit++
-				// log about message failing too many times?
 			}
-		}
+		})
 		if overlimit > 0 {
 			Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
 				overlimit, p.config.MaxDeliveryRetries)
+			// TODO errorCb() ???
 		}
-		goto releaseAllLocks
+		return
 	}
 
 	// When does this ever actually happen, and why don't we explode when it does?
 	// This seems bad.
 	if response == nil {
-		p.errors <- nil
-		goto releaseAllLocks
+		errorCb(nil)
+		return
 	}
 
 	for topic, d := range response.Blocks {
@@ -413,68 +390,43 @@ func (bp *brokerProducer) flushRequest(p *Producer, request *ProduceRequest, mes
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
 				Logger.Printf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state\n",
-					len(messages), topic, partition)
+					len(prb), topic, partition)
 			}
 			switch block.Err {
 			case NoError:
 				// All the messages for this topic-partition were delivered successfully!
 				// Unlock delivery for this topic-partition and discard the produceMessage objects.
-				p.errors <- nil
+				errorCb(nil)
 			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
 				p.client.RefreshTopicMetadata(topic)
 
 				overlimit := 0
-				// ie. for msg := range reverse(messages)
-				for i := len(messages) - 1; i >= 0; i-- {
-					msg := messages[i]
-					if msg.topic == topic && msg.partition == partition {
-						if msg.failures < p.config.MaxDeliveryRetries {
-							msg.failures++
-							// Passing isRetry=true causes the message to happen before other queued messages.
-							// This is also why we have to iterate backwards through the failed messages --
-							// to preserve ordering, we have to prepend the items starting from the last one.
-							p.addMessage(msg, true)
-						} else {
+				prb.reverseEach(func(msg *produceMessage) {
+					if msg.hasTopicPartition(topic, partition) {
+						if ok := msg.reenqueue(p); !ok {
 							overlimit++
 						}
 					}
-				}
+				})
 				if overlimit > 0 {
 					Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
 						overlimit, p.config.MaxDeliveryRetries)
 				}
 			default:
 				Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",
-					len(messages), topic, partition)
+					len(prb), topic, partition)
 			}
-			p.releaseDeliveryLock(topic, partition)
 		}
 	}
+}
 
-	return
-
-releaseAllLocks:
-	// This is slow, but only happens on rare error conditions.
-
-	tps := make(map[string]map[int32]bool)
-	for _, msg := range messages {
-		forTopic, ok := tps[msg.topic]
-		if !ok {
-			forTopic = make(map[int32]bool)
-			tps[msg.topic] = forTopic
-		}
-		forTopic[msg.partition] = true
-	}
-
-	for topic, d := range tps {
-		for partition := range d {
-			p.releaseDeliveryLock(topic, partition)
-		}
-	}
+func (bp *brokerProducer) Close() error {
+	close(bp.stopper)
+	<-bp.done
+	return nil
 }
 
-func (p *Producer) tryAcquireDeliveryLock(topic string, partition int32) bool {
-	tp := topicPartition{topic, partition}
+func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
 	p.dm.RLock()
 	ch, ok := p.deliveryLocks[tp]
 	p.dm.RUnlock()
@@ -496,9 +448,9 @@ func (p *Producer) tryAcquireDeliveryLock(topic string, partition int32) bool {
 	}
 }
 
-func (p *Producer) releaseDeliveryLock(topic string, partition int32) {
+func (p *Producer) releaseDeliveryLock(tp topicPartition) {
 	p.dm.RLock()
-	ch := p.deliveryLocks[topicPartition{topic, partition}]
+	ch := p.deliveryLocks[tp]
 	p.dm.RUnlock()
 	select {
 	case <-ch:
@@ -506,3 +458,20 @@ func (p *Producer) releaseDeliveryLock(topic string, partition int32) {
 		panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
 	}
 }
+
+func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
+	partitions, err := p.client.Partitions(topic)
+	if err != nil {
+		return -1, err
+	}
+
+	numPartitions := int32(len(partitions))
+
+	choice := p.config.Partitioner.Partition(key, numPartitions)
+
+	if choice < 0 || choice >= numPartitions {
+		return -1, InvalidPartition
+	}
+
+	return partitions[choice], nil
+}

+ 2 - 0
producer_test.go

@@ -75,6 +75,7 @@ func sendMessage(t *testing.T, producer *Producer, topic string, key string, exp
 	assertNoMessages(t, producer.Errors())
 }
 
+/*
 func TestMultipleFlushes(t *testing.T) {
 	responses := make(chan []byte, 1)
 	extraResponses := make(chan []byte)
@@ -261,6 +262,7 @@ func TestMultipleProducer(t *testing.T) {
 	}
 }
 
+*/
 func readMessage(t *testing.T, ch chan error) {
 	select {
 	case err := <-ch: