Selaa lähdekoodia

Add some simple producer benchmarks

Evan Huus 10 vuotta sitten
vanhempi
commit
fbe4417e0b
3 muutettua tiedostoa jossa 45 lisäystä ja 3 poistoa
  1. 1 1
      client_test.go
  2. 42 0
      functional_producer_test.go
  3. 2 2
      functional_test.go

+ 1 - 1
client_test.go

@@ -7,7 +7,7 @@ import (
 	"time"
 )
 
-func safeClose(t *testing.T, c io.Closer) {
+func safeClose(t testing.TB, c io.Closer) {
 	err := c.Close()
 	if err != nil {
 		t.Error(err)

+ 42 - 0
functional_producer_test.go

@@ -155,3 +155,45 @@ func testProducingMessages(t *testing.T, config *Config) {
 	safeClose(t, consumer)
 	safeClose(t, client)
 }
+
+// Benchmarks
+
+func BenchmarkProducerSmall(b *testing.B) {
+	benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 128)))
+}
+func BenchmarkProducerMedium(b *testing.B) {
+	benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 1024)))
+}
+func BenchmarkProducerLarge(b *testing.B) {
+	benchmarkProducer(b, nil, "test.64", ByteEncoder(make([]byte, 8192)))
+}
+func BenchmarkProducerSmallSinglePartition(b *testing.B) {
+	benchmarkProducer(b, nil, "test.1", ByteEncoder(make([]byte, 128)))
+}
+func BenchmarkProducerMediumSnappy(b *testing.B) {
+	conf := NewConfig()
+	conf.Producer.Compression = CompressionSnappy
+	benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024)))
+}
+
+func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
+	checkKafkaAvailability(b)
+
+	producer, err := NewAsyncProducer(kafkaBrokers, conf)
+	if err != nil {
+		b.Fatal(err)
+	}
+
+	b.ResetTimer()
+
+	for i := 1; i <= b.N; {
+		msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value}
+		select {
+		case producer.Input() <- msg:
+			i++
+		case ret := <-producer.Errors():
+			b.Fatal(ret.Err)
+		}
+	}
+	safeClose(b, producer)
+}

+ 2 - 2
functional_test.go

@@ -58,7 +58,7 @@ func init() {
 	kafkaShouldBeAvailable = os.Getenv("CI") != ""
 }
 
-func checkKafkaAvailability(t *testing.T) {
+func checkKafkaAvailability(t testing.TB) {
 	if !kafkaIsAvailable {
 		if kafkaShouldBeAvailable {
 			t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
@@ -68,7 +68,7 @@ func checkKafkaAvailability(t *testing.T) {
 	}
 }
 
-func checkKafkaVersion(t *testing.T, requiredVersion string) {
+func checkKafkaVersion(t testing.TB, requiredVersion string) {
 	kafkaVersion := os.Getenv("KAFKA_VERSION")
 	if kafkaVersion == "" {
 		t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)