浏览代码

Merge pull request #538 from Shopify/handle-unencodable-messages

Correctly handle unencodable messages in the producer
Evan Huus 10 年之前
父节点
当前提交
53f2a54a5c
共有 2 个文件被更改,包括 78 次插入17 次删除
  1. 22 16
      async_producer.go
  2. 56 1
      async_producer_test.go

+ 22 - 16
async_producer.go

@@ -137,6 +137,8 @@ type ProducerMessage struct {
 
 	retries int
 	flags   flagSet
+
+	keyCache, valueCache []byte
 }
 
 func (m *ProducerMessage) byteSize() int {
@@ -153,6 +155,8 @@ func (m *ProducerMessage) byteSize() int {
 func (m *ProducerMessage) clear() {
 	m.flags = 0
 	m.retries = 0
+	m.keyCache = nil
+	m.valueCache = nil
 }
 
 // ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -678,6 +682,7 @@ func (f *flusher) run() {
 }
 
 func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
+	var err error
 	msgSets := make(map[string]map[int32][]*ProducerMessage)
 
 	for i, msg := range batch {
@@ -697,6 +702,22 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32]
 			continue
 		}
 
+		if msg.Key != nil {
+			if msg.keyCache, err = msg.Key.Encode(); err != nil {
+				f.parent.returnError(msg, err)
+				batch[i] = nil
+				continue
+			}
+		}
+
+		if msg.Value != nil {
+			if msg.valueCache, err = msg.Value.Encode(); err != nil {
+				f.parent.returnError(msg, err)
+				batch[i] = nil
+				continue
+			}
+		}
+
 		partitionSet := msgSets[msg.Topic]
 		if partitionSet == nil {
 			partitionSet = make(map[int32][]*ProducerMessage)
@@ -804,21 +825,6 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
 			setToSend := new(MessageSet)
 			setSize := 0
 			for _, msg := range msgSet {
-				var keyBytes, valBytes []byte
-				var err error
-				if msg.Key != nil {
-					if keyBytes, err = msg.Key.Encode(); err != nil {
-						p.returnError(msg, err)
-						continue
-					}
-				}
-				if msg.Value != nil {
-					if valBytes, err = msg.Value.Encode(); err != nil {
-						p.returnError(msg, err)
-						continue
-					}
-				}
-
 				if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
 					// compression causes message-sets to be wrapped as single messages, which have tighter
 					// size requirements, so we have to respect those limits
@@ -833,7 +839,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
 				}
 				setSize += msg.byteSize()
 
-				setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
+				setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache})
 				empty = false
 			}
 

+ 56 - 1
async_producer_test.go

@@ -33,13 +33,15 @@ func closeProducer(t *testing.T, p AsyncProducer) {
 }
 
 func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
-	for successes > 0 || errors > 0 {
+	expect := successes + errors
+	for expect > 0 {
 		select {
 		case msg := <-p.Errors():
 			if msg.Msg.flags != 0 {
 				t.Error("Message had flags set")
 			}
 			errors--
+			expect--
 			if errors < 0 {
 				t.Error(msg.Err)
 			}
@@ -48,11 +50,15 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
 				t.Error("Message had flags set")
 			}
 			successes--
+			expect--
 			if successes < 0 {
 				t.Error("Too many successes")
 			}
 		}
 	}
+	if successes != 0 || errors != 0 {
+		t.Error("Unexpected successes", successes, "or errors", errors)
+	}
 }
 
 type testPartitioner chan *int32
@@ -74,6 +80,19 @@ func (p testPartitioner) feed(partition int32) {
 	p <- &partition
 }
 
+type flakyEncoder bool
+
+func (f flakyEncoder) Length() int {
+	return len(TestMessage)
+}
+
+func (f flakyEncoder) Encode() ([]byte, error) {
+	if !bool(f) {
+		return nil, errors.New("flaky encoding error")
+	}
+	return []byte(TestMessage), nil
+}
+
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
@@ -285,6 +304,42 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+func TestAsyncProducerEncoderFailures(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 3
+	config.Producer.Return.Successes = true
+	config.Producer.Partitioner = NewManualPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for flush := 0; flush < 3; flush++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
+		expectResults(t, producer, 1, 2)
+	}
+
+	closeProducer(t, producer)
+	leader.Close()
+	seedBroker.Close()
+}
+
 // If a Kafka broker becomes unavailable and then returns back in service, then
 // producer reconnects to it and continues sending messages.
 func TestAsyncProducerBrokerBounce(t *testing.T) {