|
@@ -35,10 +35,9 @@ func TestProduceSetInitial(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestProduceSetAddingMessages(t *testing.T) {
|
|
|
- parent, ps := makeProduceSet()
|
|
|
- parent.conf.Producer.Flush.MaxMessages = 1000
|
|
|
-
|
|
|
+ _, ps := makeProduceSet()
|
|
|
msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
|
|
|
+
|
|
|
safeAddMessage(t, ps, msg)
|
|
|
|
|
|
if ps.empty() {
|
|
@@ -48,8 +47,15 @@ func TestProduceSetAddingMessages(t *testing.T) {
|
|
|
if !ps.readyToFlush() {
|
|
|
t.Error("by default set should be ready to flush when any message is in place")
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- for i := 0; i < 999; i++ {
|
|
|
+func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) {
|
|
|
+ parent, ps := makeProduceSet()
|
|
|
+ parent.conf.Producer.Flush.MaxMessages = 1000
|
|
|
+
|
|
|
+ msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
|
|
|
+
|
|
|
+ for i := 0; i < 1000; i++ {
|
|
|
if ps.wouldOverflow(msg) {
|
|
|
t.Error("set shouldn't fill up after only", i+1, "messages")
|
|
|
}
|
|
@@ -61,6 +67,24 @@ func TestProduceSetAddingMessages(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
|
|
|
+ parent, ps := makeProduceSet()
|
|
|
+ parent.conf.Producer.MaxMessageBytes = 1000
|
|
|
+
|
|
|
+ msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
|
|
|
+
|
|
|
+ for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
|
|
|
+ if ps.wouldOverflow(msg) {
|
|
|
+ t.Error("set shouldn't fill up before 1000 bytes")
|
|
|
+ }
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !ps.wouldOverflow(msg) {
|
|
|
+ t.Error("set should be full after 1000 bytes")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestProduceSetPartitionTracking(t *testing.T) {
|
|
|
_, ps := makeProduceSet()
|
|
|
|