Burke Libbey преди 12 години
родител
ревизия
5c9e5571ff
променени са 3 файла, в които са добавени 35 реда и са изтрити 7 реда
  1. 31 0
      produce_message.go
  2. 4 5
      producer.go
  3. 0 2
      producer_test.go

+ 31 - 0
produce_message.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "log"
+
 type produceMessage struct {
 	tp         topicPartition
 	key, value []byte
@@ -44,6 +46,35 @@ func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool
 
 func (b produceRequestBuilder) toRequest(config *ProducerConfig) *ProduceRequest {
 	req := &ProduceRequest{RequiredAcks: config.RequiredAcks, Timeout: config.Timeout}
+
+	// If compression is enabled, we need to group messages by topic-partition and
+	// wrap them in MessageSets. We already discarded that grouping, so we
+	// inefficiently re-sort them. This could be optimized (ie. pass a hash around
+	// rather than an array. Not sure what the best way is.
+	if config.Compression != CompressionNone {
+		msgSets := make(map[topicPartition]*MessageSet)
+		for _, pmsg := range b {
+			msgSet, ok := msgSets[pmsg.tp]
+			if !ok {
+				msgSet = new(MessageSet)
+				msgSets[pmsg.tp] = msgSet
+			}
+
+			msgSet.addMessage(&Message{Codec: CompressionNone, Key: pmsg.key, Value: pmsg.value})
+		}
+		for tp, msgSet := range msgSets {
+			valBytes, err := encode(msgSet)
+			if err != nil {
+				log.Fatal(err) // if this happens, it's basically our fault.
+			}
+			msg := Message{Codec: config.Compression, Key: nil, Value: valBytes}
+			req.AddMessage(tp.topic, tp.partition, &msg)
+		}
+		return req
+	}
+
+	// Compression is not enabled. Dumb-ly append each request directly to the
+	// request, with no MessageSet wrapper.
 	for _, pmsg := range b {
 		msg := Message{Codec: config.Compression, Key: pmsg.key, Value: pmsg.value}
 		req.AddMessage(pmsg.tp.topic, pmsg.tp.partition, &msg)

+ 4 - 5
producer.go

@@ -1,7 +1,7 @@
 package sarama
 
 import (
-	"errors"
+	"fmt"
 	"sync"
 	"time"
 )
@@ -320,8 +320,6 @@ func (bp *brokerProducer) flush(p *Producer) {
 		bp.bufferedBytes -= prb.byteSize()
 		bp.mapM.Unlock()
 
-		// TODO: Compression probably discards messages because they need to be wrapped in a MessageSet or something.
-
 		bp.flushRequest(p, prb, func(err error) {
 			p.errors <- err
 		})
@@ -329,6 +327,7 @@ func (bp *brokerProducer) flush(p *Producer) {
 }
 
 func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
+	// produce_message.go
 	req := prb.toRequest(&p.config)
 	response, err := bp.broker.Produce(p.client.id, req)
 
@@ -354,7 +353,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		if overlimit > 0 {
 			Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
 				overlimit, p.config.MaxDeliveryRetries)
-			// TODO errorCb() ???
+			errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
 		}
 		return
 	}
@@ -405,7 +404,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 func (bp *brokerProducer) Close() error {
 	select {
 	case <-bp.stopper:
-		return errors.New("already closed or closing")
+		return fmt.Errorf("already closed or closing")
 	default:
 		close(bp.stopper)
 		<-bp.done

+ 0 - 2
producer_test.go

@@ -174,8 +174,6 @@ func TestMultipleProducer(t *testing.T) {
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
 func TestFailureRetry(t *testing.T) {
-	println("=============================================")
-
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 	mb3 := NewMockBroker(t, 3)