123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- //+build functional
- package sarama
- import (
- "fmt"
- "os"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
- toxiproxy "github.com/Shopify/toxiproxy/client"
- "github.com/rcrowley/go-metrics"
- )
- const TestBatchSize = 1000
- func TestFuncProducing(t *testing.T) {
- config := NewConfig()
- testProducingMessages(t, config)
- }
- func TestFuncProducingGzip(t *testing.T) {
- config := NewConfig()
- config.Producer.Compression = CompressionGZIP
- testProducingMessages(t, config)
- }
- func TestFuncProducingSnappy(t *testing.T) {
- config := NewConfig()
- config.Producer.Compression = CompressionSnappy
- testProducingMessages(t, config)
- }
- func TestFuncProducingZstd(t *testing.T) {
- config := NewConfig()
- config.Version = V2_1_0_0
- config.Producer.Compression = CompressionZSTD
- testProducingMessages(t, config)
- }
- func TestFuncProducingNoResponse(t *testing.T) {
- config := NewConfig()
- config.Producer.RequiredAcks = NoResponse
- testProducingMessages(t, config)
- }
- func TestFuncProducingFlushing(t *testing.T) {
- config := NewConfig()
- config.Producer.Flush.Messages = TestBatchSize / 8
- config.Producer.Flush.Frequency = 250 * time.Millisecond
- testProducingMessages(t, config)
- }
- func TestFuncMultiPartitionProduce(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- config := NewConfig()
- config.ChannelBufferSize = 20
- config.Producer.Flush.Frequency = 50 * time.Millisecond
- config.Producer.Flush.Messages = 200
- config.Producer.Return.Successes = true
- producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
- if err != nil {
- t.Fatal(err)
- }
- var wg sync.WaitGroup
- wg.Add(TestBatchSize)
- for i := 1; i <= TestBatchSize; i++ {
- go func(i int) {
- defer wg.Done()
- msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
- if _, _, err := producer.SendMessage(msg); err != nil {
- t.Error(i, err)
- }
- }(i)
- }
- wg.Wait()
- if err := producer.Close(); err != nil {
- t.Error(err)
- }
- }
- func TestFuncProducingToInvalidTopic(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
- if err != nil {
- t.Fatal(err)
- }
- if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
- t.Error("Expected ErrUnknownTopicOrPartition, found", err)
- }
- if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
- t.Error("Expected ErrUnknownTopicOrPartition, found", err)
- }
- safeClose(t, producer)
- }
- func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- config := NewConfig()
- config.Producer.Flush.Frequency = 250 * time.Millisecond
- config.Producer.Idempotent = true
- config.Producer.Timeout = 500 * time.Millisecond
- config.Producer.Retry.Max = 1
- config.Producer.Retry.Backoff = 500 * time.Millisecond
- config.Producer.Return.Successes = true
- config.Producer.Return.Errors = true
- config.Producer.RequiredAcks = WaitForAll
- config.Net.MaxOpenRequests = 1
- config.Version = V0_11_0_0
- producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, producer)
- // Successfully publish a few messages
- for i := 0; i < 10; i++ {
- _, _, err = producer.SendMessage(&ProducerMessage{
- Topic: "test.1",
- Value: StringEncoder(fmt.Sprintf("%d message", i)),
- })
- if err != nil {
- t.Fatal(err)
- }
- }
- // break the brokers.
- for proxyName, proxy := range FunctionalTestEnv.Proxies {
- if !strings.Contains(proxyName, "kafka") {
- continue
- }
- if err := proxy.Disable(); err != nil {
- t.Fatal(err)
- }
- }
- // This should fail hard now
- for i := 10; i < 20; i++ {
- _, _, err = producer.SendMessage(&ProducerMessage{
- Topic: "test.1",
- Value: StringEncoder(fmt.Sprintf("%d message", i)),
- })
- if err == nil {
- t.Fatal(err)
- }
- }
- // Now bring the proxy back up
- for proxyName, proxy := range FunctionalTestEnv.Proxies {
- if !strings.Contains(proxyName, "kafka") {
- continue
- }
- if err := proxy.Enable(); err != nil {
- t.Fatal(err)
- }
- }
- // We should be able to publish again (once everything calms down)
- // (otherwise it times out)
- for {
- _, _, err = producer.SendMessage(&ProducerMessage{
- Topic: "test.1",
- Value: StringEncoder("comeback message"),
- })
- if err == nil {
- break
- }
- }
- }
- func TestInterceptors(t *testing.T) {
- config := NewConfig()
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- config.Producer.Return.Successes = true
- config.Consumer.Return.Errors = true
- config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}}
- config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}}
- client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
- if err != nil {
- t.Fatal(err)
- }
- initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewAsyncProducerFromClient(client)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < 10; i++ {
- producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
- }
- for i := 0; i < 10; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case msg := <-producer.Successes():
- v, _ := msg.Value.Encode()
- expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100)
- if string(v) != expected {
- t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
- }
- }
- }
- safeClose(t, producer)
- master, err := NewConsumerFromClient(client)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < 10; i++ {
- select {
- case <-time.After(10 * time.Second):
- t.Fatal("Not received any more events in the last 10 seconds.")
- case err := <-consumer.Errors():
- t.Error(err)
- case msg := <-consumer.Messages():
- prodInteExpectation := strconv.Itoa(i) + strconv.Itoa(i+100)
- consInteExpectation := strconv.Itoa(i + 20)
- expected := TestMessage + prodInteExpectation + consInteExpectation
- v := string(msg.Value)
- if v != expected {
- t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
- }
- }
- }
- safeClose(t, consumer)
- safeClose(t, client)
- }
- func testProducingMessages(t *testing.T, config *Config) {
- setupFunctionalTest(t)
- defer teardownFunctionalTest(t)
- // Configure some latency in order to properly validate the request latency metric
- for _, proxy := range FunctionalTestEnv.Proxies {
- if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
- t.Fatal("Unable to configure latency toxicity", err)
- }
- }
- config.Producer.Return.Successes = true
- config.Consumer.Return.Errors = true
- client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
- if err != nil {
- t.Fatal(err)
- }
- // Keep in mind the current offset
- initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewAsyncProducerFromClient(client)
- if err != nil {
- t.Fatal(err)
- }
- expectedResponses := TestBatchSize
- for i := 1; i <= TestBatchSize; {
- msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
- select {
- case producer.Input() <- msg:
- i++
- case ret := <-producer.Errors():
- t.Fatal(ret.Err)
- case <-producer.Successes():
- expectedResponses--
- }
- }
- for expectedResponses > 0 {
- select {
- case ret := <-producer.Errors():
- t.Fatal(ret.Err)
- case <-producer.Successes():
- expectedResponses--
- }
- }
- safeClose(t, producer)
- // Validate producer metrics before using the consumer minus the offset request
- validateMetrics(t, client)
- master, err := NewConsumerFromClient(client)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
- if err != nil {
- t.Fatal(err)
- }
- for i := 1; i <= TestBatchSize; i++ {
- select {
- case <-time.After(10 * time.Second):
- t.Fatal("Not received any more events in the last 10 seconds.")
- case err := <-consumer.Errors():
- t.Error(err)
- case message := <-consumer.Messages():
- if string(message.Value) != fmt.Sprintf("testing %d", i) {
- t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
- }
- }
- }
- safeClose(t, consumer)
- safeClose(t, client)
- }
- func validateMetrics(t *testing.T, client Client) {
- // Get the broker used by test1 topic
- var broker *Broker
- if partitions, err := client.Partitions("test.1"); err != nil {
- t.Error(err)
- } else {
- for _, partition := range partitions {
- if b, err := client.Leader("test.1", partition); err != nil {
- t.Error(err)
- } else {
- if broker != nil && b != broker {
- t.Fatal("Expected only one broker, got at least 2")
- }
- broker = b
- }
- }
- }
- metricValidators := newMetricValidators()
- noResponse := client.Config().Producer.RequiredAcks == NoResponse
- compressionEnabled := client.Config().Producer.Compression != CompressionNone
- // We are adding 10ms of latency to all requests with toxiproxy
- minRequestLatencyInMs := 10
- if noResponse {
- // but when we do not wait for a response it can be less than 1ms
- minRequestLatencyInMs = 0
- }
- // We read at least 1 byte from the broker
- metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1))
- // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request)
- metricValidators.register(minCountMeterValidator("request-rate", 3))
- metricValidators.register(minCountHistogramValidator("request-size", 3))
- metricValidators.register(minValHistogramValidator("request-size", 1))
- // and at least 2 requests to the registered broker (offset + produces)
- metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2))
- metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2))
- metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1))
- metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs))
- // We send at least 1 batch
- metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1))
- metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1))
- if compressionEnabled {
- // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
- metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
- metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
- metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
- } else {
- // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
- metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
- metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
- metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
- }
- // We send exactly TestBatchSize messages
- metricValidators.registerForGlobalAndTopic("test_1", countMeterValidator("record-send-rate", TestBatchSize))
- // We send at least one record per request
- metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("records-per-request", 1))
- metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("records-per-request", 1))
- // We receive at least 1 byte from the broker
- metricValidators.registerForAllBrokers(broker, minCountMeterValidator("outgoing-byte-rate", 1))
- if noResponse {
- // in exactly 2 global responses (metadata + offset)
- metricValidators.register(countMeterValidator("response-rate", 2))
- metricValidators.register(minCountHistogramValidator("response-size", 2))
- // and exactly 1 offset response for the registered broker
- metricValidators.registerForBroker(broker, countMeterValidator("response-rate", 1))
- metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 1))
- metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
- } else {
- // in at least 3 global responses (metadata + offset + produces)
- metricValidators.register(minCountMeterValidator("response-rate", 3))
- metricValidators.register(minCountHistogramValidator("response-size", 3))
- // and at least 2 for the registered broker
- metricValidators.registerForBroker(broker, minCountMeterValidator("response-rate", 2))
- metricValidators.registerForBroker(broker, minCountHistogramValidator("response-size", 2))
- metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
- }
- // There should be no requests in flight anymore
- metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
- // Run the validators
- metricValidators.run(t, client.Config().MetricRegistry)
- }
- // Benchmarks
- func BenchmarkProducerSmall(b *testing.B) {
- benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
- }
- func BenchmarkProducerMedium(b *testing.B) {
- benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
- }
- func BenchmarkProducerLarge(b *testing.B) {
- benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
- }
- func BenchmarkProducerSmallSinglePartition(b *testing.B) {
- benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
- }
- func BenchmarkProducerMediumSnappy(b *testing.B) {
- conf := NewConfig()
- conf.Producer.Compression = CompressionSnappy
- benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
- }
- func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
- setupFunctionalTest(b)
- defer teardownFunctionalTest(b)
- metricsDisable := os.Getenv("METRICS_DISABLE")
- if metricsDisable != "" {
- previousUseNilMetrics := metrics.UseNilMetrics
- Logger.Println("Disabling metrics using no-op implementation")
- metrics.UseNilMetrics = true
- // Restore previous setting
- defer func() {
- metrics.UseNilMetrics = previousUseNilMetrics
- }()
- }
- producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf)
- if err != nil {
- b.Fatal(err)
- }
- b.ResetTimer()
- for i := 1; i <= b.N; {
- msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
- select {
- case producer.Input() <- msg:
- i++
- case ret := <-producer.Errors():
- b.Fatal(ret.Err)
- }
- }
- safeClose(b, producer)
- }
|