123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- package sarama
- import (
- "fmt"
- "testing"
- "time"
- )
- func makeProduceSet() (*asyncProducer, *produceSet) {
- conf := NewConfig()
- txnmgr, _ := newTransactionManager(conf, nil)
- parent := &asyncProducer{
- conf: conf,
- txnmgr: txnmgr,
- }
- return parent, newProduceSet(parent)
- }
- func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
- if err := ps.add(msg); err != nil {
- t.Error(err)
- }
- }
- func TestProduceSetInitial(t *testing.T) {
- _, ps := makeProduceSet()
- if !ps.empty() {
- t.Error("New produceSet should be empty")
- }
- if ps.readyToFlush() {
- t.Error("Empty produceSet must never be ready to flush")
- }
- }
- func TestProduceSetAddingMessages(t *testing.T) {
- _, ps := makeProduceSet()
- msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
- safeAddMessage(t, ps, msg)
- if ps.empty() {
- t.Error("set shouldn't be empty when a message is added")
- }
- if !ps.readyToFlush() {
- t.Error("by default set should be ready to flush when any message is in place")
- }
- }
- func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.Flush.MaxMessages = 1000
- msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
- for i := 0; i < 1000; i++ {
- if ps.wouldOverflow(msg) {
- t.Error("set shouldn't fill up after only", i+1, "messages")
- }
- safeAddMessage(t, ps, msg)
- }
- if !ps.wouldOverflow(msg) {
- t.Error("set should be full after 1000 messages")
- }
- }
- func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.MaxMessageBytes = 1000
- msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
- for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
- if ps.wouldOverflow(msg) {
- t.Error("set shouldn't fill up before 1000 bytes")
- }
- safeAddMessage(t, ps, msg)
- }
- if !ps.wouldOverflow(msg) {
- t.Error("set should be full after 1000 bytes")
- }
- }
- func TestProduceSetPartitionTracking(t *testing.T) {
- _, ps := makeProduceSet()
- m1 := &ProducerMessage{Topic: "t1", Partition: 0}
- m2 := &ProducerMessage{Topic: "t1", Partition: 1}
- m3 := &ProducerMessage{Topic: "t2", Partition: 0}
- safeAddMessage(t, ps, m1)
- safeAddMessage(t, ps, m2)
- safeAddMessage(t, ps, m3)
- seenT1P0 := false
- seenT1P1 := false
- seenT2P0 := false
- ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
- if len(pSet.msgs) != 1 {
- t.Error("Wrong message count")
- }
- if topic == "t1" && partition == 0 {
- seenT1P0 = true
- } else if topic == "t1" && partition == 1 {
- seenT1P1 = true
- } else if topic == "t2" && partition == 0 {
- seenT2P0 = true
- }
- })
- if !seenT1P0 {
- t.Error("Didn't see t1p0")
- }
- if !seenT1P1 {
- t.Error("Didn't see t1p1")
- }
- if !seenT2P0 {
- t.Error("Didn't see t2p0")
- }
- if len(ps.dropPartition("t1", 1)) != 1 {
- t.Error("Got wrong messages back from dropping partition")
- }
- if ps.bufferCount != 2 {
- t.Error("Incorrect buffer count after dropping partition")
- }
- }
- func TestProduceSetRequestBuilding(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.RequiredAcks = WaitForAll
- parent.conf.Producer.Timeout = 10 * time.Second
- msg := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- }
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- msg.Partition = 1
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- msg.Topic = "t2"
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- req := ps.buildRequest()
- if req.RequiredAcks != WaitForAll {
- t.Error("RequiredAcks not set properly")
- }
- if req.Timeout != 10000 {
- t.Error("Timeout not set properly")
- }
- if len(req.records) != 2 {
- t.Error("Wrong number of topics in request")
- }
- }
- func TestProduceSetCompressedRequestBuilding(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.RequiredAcks = WaitForAll
- parent.conf.Producer.Timeout = 10 * time.Second
- parent.conf.Producer.Compression = CompressionGZIP
- parent.conf.Version = V0_10_0_0
- msg := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Timestamp: time.Now(),
- }
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- }
- req := ps.buildRequest()
- if req.Version != 2 {
- t.Error("Wrong request version")
- }
- for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
- msg := msgBlock.Msg
- err := msg.decodeSet()
- if err != nil {
- t.Error("Failed to decode set from payload")
- }
- for i, compMsgBlock := range msg.Set.Messages {
- compMsg := compMsgBlock.Msg
- if compMsg.Version != 1 {
- t.Error("Wrong compressed message version")
- }
- if compMsgBlock.Offset != int64(i) {
- t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
- }
- }
- if msg.Version != 1 {
- t.Error("Wrong compressed parent message version")
- }
- }
- }
- func TestProduceSetV3RequestBuilding(t *testing.T) {
- parent, ps := makeProduceSet()
- parent.conf.Producer.RequiredAcks = WaitForAll
- parent.conf.Producer.Timeout = 10 * time.Second
- parent.conf.Version = V0_11_0_0
- now := time.Now()
- msg := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Headers: []RecordHeader{
- {
- Key: []byte("header-1"),
- Value: []byte("value-1"),
- },
- {
- Key: []byte("header-2"),
- Value: []byte("value-2"),
- },
- {
- Key: []byte("header-3"),
- Value: []byte("value-3"),
- },
- },
- Timestamp: now,
- }
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- msg.Timestamp = msg.Timestamp.Add(time.Second)
- }
- req := ps.buildRequest()
- if req.Version != 3 {
- t.Error("Wrong request version")
- }
- batch := req.records["t1"][0].RecordBatch
- if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
- t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
- }
- for i := 0; i < 10; i++ {
- rec := batch.Records[i]
- if rec.TimestampDelta != time.Duration(i)*time.Second {
- t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
- }
- if rec.OffsetDelta != int64(i) {
- t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
- }
- for j, h := range batch.Records[i].Headers {
- exp := fmt.Sprintf("header-%d", j+1)
- if string(h.Key) != exp {
- t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
- }
- exp = fmt.Sprintf("value-%d", j+1)
- if string(h.Value) != exp {
- t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
- }
- }
- }
- }
- func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
- const pID = 1000
- const pEpoch = 1234
- config := NewConfig()
- config.Producer.RequiredAcks = WaitForAll
- config.Producer.Idempotent = true
- config.Version = V0_11_0_0
- parent := &asyncProducer{
- conf: config,
- txnmgr: &transactionManager{
- producerID: pID,
- producerEpoch: pEpoch,
- },
- }
- ps := newProduceSet(parent)
- now := time.Now()
- msg := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Headers: []RecordHeader{
- {
- Key: []byte("header-1"),
- Value: []byte("value-1"),
- },
- {
- Key: []byte("header-2"),
- Value: []byte("value-2"),
- },
- {
- Key: []byte("header-3"),
- Value: []byte("value-3"),
- },
- },
- Timestamp: now,
- sequenceNumber: 123,
- }
- for i := 0; i < 10; i++ {
- safeAddMessage(t, ps, msg)
- msg.Timestamp = msg.Timestamp.Add(time.Second)
- }
- req := ps.buildRequest()
- if req.Version != 3 {
- t.Error("Wrong request version")
- }
- batch := req.records["t1"][0].RecordBatch
- if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
- t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
- }
- if batch.ProducerID != pID {
- t.Errorf("Wrong producerID: %v", batch.ProducerID)
- }
- if batch.ProducerEpoch != pEpoch {
- t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch)
- }
- if batch.FirstSequence != 123 {
- t.Errorf("Wrong first sequence: %v", batch.FirstSequence)
- }
- for i := 0; i < 10; i++ {
- rec := batch.Records[i]
- if rec.TimestampDelta != time.Duration(i)*time.Second {
- t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
- }
- if rec.OffsetDelta != int64(i) {
- t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
- }
- for j, h := range batch.Records[i].Headers {
- exp := fmt.Sprintf("header-%d", j+1)
- if string(h.Key) != exp {
- t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
- }
- exp = fmt.Sprintf("value-%d", j+1)
- if string(h.Value) != exp {
- t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
- }
- }
- }
- }
- func TestProduceSetConsistentTimestamps(t *testing.T) {
- parent, ps1 := makeProduceSet()
- ps2 := newProduceSet(parent)
- parent.conf.Producer.RequiredAcks = WaitForAll
- parent.conf.Producer.Timeout = 10 * time.Second
- parent.conf.Version = V0_11_0_0
- msg1 := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Timestamp: time.Unix(1555718400, 500000000),
- sequenceNumber: 123,
- }
- msg2 := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Timestamp: time.Unix(1555718400, 500900000),
- sequenceNumber: 123,
- }
- msg3 := &ProducerMessage{
- Topic: "t1",
- Partition: 0,
- Key: StringEncoder(TestMessage),
- Value: StringEncoder(TestMessage),
- Timestamp: time.Unix(1555718400, 600000000),
- sequenceNumber: 123,
- }
- safeAddMessage(t, ps1, msg1)
- safeAddMessage(t, ps1, msg3)
- req1 := ps1.buildRequest()
- if req1.Version != 3 {
- t.Error("Wrong request version")
- }
- batch1 := req1.records["t1"][0].RecordBatch
- ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
- time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)
- safeAddMessage(t, ps2, msg2)
- safeAddMessage(t, ps2, msg3)
- req2 := ps2.buildRequest()
- if req2.Version != 3 {
- t.Error("Wrong request version")
- }
- batch2 := req2.records["t1"][0].RecordBatch
- ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
- time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)
- if time1 != time2 {
- t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
- }
- }
|