|
@@ -12,6 +12,12 @@ func makeProduceSet() (*asyncProducer, *produceSet) {
|
|
|
return parent, newProduceSet(parent)
|
|
|
}
|
|
|
|
|
|
+func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
|
|
|
+ if err := ps.add(msg); err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestProduceSetInitial(t *testing.T) {
|
|
|
_, ps := makeProduceSet()
|
|
|
|
|
@@ -29,7 +35,7 @@ func TestProduceSetAddingMessages(t *testing.T) {
|
|
|
parent.conf.Producer.Flush.MaxMessages = 1000
|
|
|
|
|
|
msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
|
|
|
- ps.add(msg)
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
|
|
|
if ps.empty() {
|
|
|
t.Error("set shouldn't be empty when a message is added")
|
|
@@ -43,7 +49,7 @@ func TestProduceSetAddingMessages(t *testing.T) {
|
|
|
if ps.wouldOverflow(msg) {
|
|
|
t.Error("set shouldn't fill up after only", i+1, "messages")
|
|
|
}
|
|
|
- ps.add(msg)
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
}
|
|
|
|
|
|
if !ps.wouldOverflow(msg) {
|
|
@@ -57,9 +63,9 @@ func TestProduceSetPartitionTracking(t *testing.T) {
|
|
|
m1 := &ProducerMessage{Topic: "t1", Partition: 0}
|
|
|
m2 := &ProducerMessage{Topic: "t1", Partition: 1}
|
|
|
m3 := &ProducerMessage{Topic: "t2", Partition: 0}
|
|
|
- ps.add(m1)
|
|
|
- ps.add(m2)
|
|
|
- ps.add(m3)
|
|
|
+ safeAddMessage(t, ps, m1)
|
|
|
+ safeAddMessage(t, ps, m2)
|
|
|
+ safeAddMessage(t, ps, m3)
|
|
|
|
|
|
seenT1P0 := false
|
|
|
seenT1P1 := false
|
|
@@ -110,15 +116,15 @@ func TestProduceSetRequestBuilding(t *testing.T) {
|
|
|
Value: StringEncoder(TestMessage),
|
|
|
}
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- ps.add(msg)
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
}
|
|
|
msg.Partition = 1
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- ps.add(msg)
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
}
|
|
|
msg.Topic = "t2"
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- ps.add(msg)
|
|
|
+ safeAddMessage(t, ps, msg)
|
|
|
}
|
|
|
|
|
|
req := ps.buildRequest()
|