Browse Source

Fix handling of unencodable messages

Move `Key.Encode()` and `Value.Encode()` calls slightly earlier (to
`groupAndFilter`) where we have access to the batch in order to be able to
remove them from consideration on error. Otherwise failed messages would not be
removed from the batch and could end up returned twice.

Add cache members to the ProducerMessage struct to store the results until we
actually need them.

Fixes #449.

This is perhaps not the most elegant solution. However it is correct and a
better solution would be a lot more invasive. This will do in order to ship the
fix in a 1.5.1 patch release.
Evan Huus 10 years ago
parent
commit
2840a3795a
1 changed files with 22 additions and 16 deletions
  1. 22 16
      async_producer.go

+ 22 - 16
async_producer.go

@@ -119,6 +119,8 @@ type ProducerMessage struct {
 
 	retries int
 	flags   flagSet
+
+	keyCache, valueCache []byte
 }
 
 func (m *ProducerMessage) byteSize() int {
@@ -135,6 +137,8 @@ func (m *ProducerMessage) byteSize() int {
 func (m *ProducerMessage) clear() {
 	m.flags = 0
 	m.retries = 0
+	m.keyCache = nil
+	m.valueCache = nil
 }
 
 // ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -660,6 +664,7 @@ func (f *flusher) run() {
 }
 
 func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
+	var err error
 	msgSets := make(map[string]map[int32][]*ProducerMessage)
 
 	for i, msg := range batch {
@@ -679,6 +684,22 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32]
 			continue
 		}
 
+		if msg.Key != nil {
+			if msg.keyCache, err = msg.Key.Encode(); err != nil {
+				f.parent.returnError(msg, err)
+				batch[i] = nil
+				continue
+			}
+		}
+
+		if msg.Value != nil {
+			if msg.valueCache, err = msg.Value.Encode(); err != nil {
+				f.parent.returnError(msg, err)
+				batch[i] = nil
+				continue
+			}
+		}
+
 		partitionSet := msgSets[msg.Topic]
 		if partitionSet == nil {
 			partitionSet = make(map[int32][]*ProducerMessage)
@@ -786,21 +807,6 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
 			setToSend := new(MessageSet)
 			setSize := 0
 			for _, msg := range msgSet {
-				var keyBytes, valBytes []byte
-				var err error
-				if msg.Key != nil {
-					if keyBytes, err = msg.Key.Encode(); err != nil {
-						p.returnError(msg, err)
-						continue
-					}
-				}
-				if msg.Value != nil {
-					if valBytes, err = msg.Value.Encode(); err != nil {
-						p.returnError(msg, err)
-						continue
-					}
-				}
-
 				if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
 					// compression causes message-sets to be wrapped as single messages, which have tighter
 					// size requirements, so we have to respect those limits
@@ -815,7 +821,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
 				}
 				setSize += msg.byteSize()
 
-				setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
+				setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache})
 				empty = false
 			}