Browse Source

Respect MaxMessageBytes limit for uncompressed messages

Adam Dratwinski 6 years ago
parent
commit
0017e59f4b
2 changed files with 30 additions and 7 deletions
  1. 2 3
      produce_set.go
  2. 28 4
      produce_set_test.go

+ 2 - 3
produce_set.go

@@ -215,9 +215,8 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
 	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
 	case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
 		return true
-	// Would we overflow the size-limit of a compressed message-batch for this partition?
-	case ps.parent.conf.Producer.Compression != CompressionNone &&
-		ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
+	// Would we overflow the size-limit of a message-batch for this partition?
+	case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
 		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
 		return true
 	// Would we overflow simply in number of messages?

+ 28 - 4
produce_set_test.go

@@ -32,10 +32,9 @@ func TestProduceSetInitial(t *testing.T) {
 }
 
 func TestProduceSetAddingMessages(t *testing.T) {
-	parent, ps := makeProduceSet()
-	parent.conf.Producer.Flush.MaxMessages = 1000
-
+	_, ps := makeProduceSet()
 	msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
+
 	safeAddMessage(t, ps, msg)
 
 	if ps.empty() {
@@ -45,8 +44,15 @@ func TestProduceSetAddingMessages(t *testing.T) {
 	if !ps.readyToFlush() {
 		t.Error("by default set should be ready to flush when any message is in place")
 	}
+}
 
-	for i := 0; i < 999; i++ {
+func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) {
+	parent, ps := makeProduceSet()
+	parent.conf.Producer.Flush.MaxMessages = 1000
+
+	msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
+
+	for i := 0; i < 1000; i++ {
 		if ps.wouldOverflow(msg) {
 			t.Error("set shouldn't fill up after only", i+1, "messages")
 		}
@@ -58,6 +64,24 @@ func TestProduceSetAddingMessages(t *testing.T) {
 	}
 }
 
+func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
+	parent, ps := makeProduceSet()
+	parent.conf.Producer.MaxMessageBytes = 1000
+
+	msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
+
+	for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
+		if ps.wouldOverflow(msg) {
+			t.Error("set shouldn't fill up before 1000 bytes")
+		}
+		safeAddMessage(t, ps, msg)
+	}
+
+	if !ps.wouldOverflow(msg) {
+		t.Error("set should be full after 1000 bytes")
+	}
+}
+
 func TestProduceSetPartitionTracking(t *testing.T) {
 	_, ps := makeProduceSet()