123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package sarama
- import (
- "testing"
- "time"
- )
- func makeProduceSet() (*asyncProducer, *produceSet) {
- parent := &asyncProducer{
- conf: NewConfig(),
- }
- return parent, newProduceSet(parent)
- }
- func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
- if err := ps.add(msg); err != nil {
- t.Error(err)
- }
- }
- func TestProduceSetInitial(t *testing.T) {
- _, ps := makeProduceSet()
- if !ps.empty() {
- t.Error("New produceSet should be empty")
- }
- if ps.readyToFlush() {
- t.Error("Empty produceSet must never be ready to flush")
- }
- }
- func TestProduceSetAddingMessages(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.Flush.MaxMessages = 1000
- msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
- safeAddMessage(t, ps, msg)
- if ps.empty() {
- t.Error("set shouldn't be empty when a message is added")
- }
- if !ps.readyToFlush() {
- t.Error("by default set should be ready to flush when any message is in place")
- }
- for i := 0; i < 999; i++ {
- if ps.wouldOverflow(msg) {
- t.Error("set shouldn't fill up after only", i+1, "messages")
- }
- safeAddMessage(t, ps, msg)
- }
- if !ps.wouldOverflow(msg) {
- t.Error("set should be full after 1000 messages")
- }
- }
- func TestProduceSetPartitionTracking(t *testing.T) {
- _, ps := makeProduceSet()
- m1 := &ProducerMessage{Topic: "t1", Partition: 0}
- m2 := &ProducerMessage{Topic: "t1", Partition: 1}
- m3 := &ProducerMessage{Topic: "t2", Partition: 0}
- safeAddMessage(t, ps, m1)
- safeAddMessage(t, ps, m2)
- safeAddMessage(t, ps, m3)
- seenT1P0 := false
- seenT1P1 := false
- seenT2P0 := false
- ps.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
- if len(msgs) != 1 {
- t.Error("Wrong message count")
- }
- if topic == "t1" && partition == 0 {
- seenT1P0 = true
- } else if topic == "t1" && partition == 1 {
- seenT1P1 = true
- } else if topic == "t2" && partition == 0 {
- seenT2P0 = true
- }
- })
- if !seenT1P0 {
- t.Error("Didn't see t1p0")
- }
- if !seenT1P1 {
- t.Error("Didn't see t1p1")
- }
- if !seenT2P0 {
- t.Error("Didn't see t2p0")
- }
- if len(ps.dropPartition("t1", 1)) != 1 {
- t.Error("Got wrong messages back from dropping partition")
- }
- if ps.bufferCount != 2 {
- t.Error("Incorrect buffer count after dropping partition")
- }
- }
- func TestProduceSetRequestBuilding(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.RequiredAcks = WaitForAll
- parent.conf.Producer.Timeout = 10 * time.Second
- msg := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- }
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- msg.Partition = 1
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- msg.Topic = "t2"
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- req := ps.buildRequest()
- if req.RequiredAcks != WaitForAll {
- t.Error("RequiredAcks not set properly")
- }
- if req.Timeout != 10000 {
- t.Error("Timeout not set properly")
- }
- if len(req.msgSets) != 2 {
- t.Error("Wrong number of topics in request")
- }
- }
|