|
|
@@ -2,6 +2,8 @@ package sarama
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
+ "net"
|
|
|
+ "os"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
@@ -10,8 +12,43 @@ const (
|
|
|
TestBatchSize = 1000
|
|
|
)
|
|
|
|
|
|
+var (
|
|
|
+ kafkaIsAvailable, kafkaShouldBeAvailable bool
|
|
|
+ kafkaAddr string
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ env := func(key, defaultValue string) string {
|
|
|
+ if value := os.Getenv(key); value != "" {
|
|
|
+ return value
|
|
|
+ }
|
|
|
+ return defaultValue
|
|
|
+ }
|
|
|
+
|
|
|
+ kafkaAddr := env("KAFKA_ADDR", "localhost:9092")
|
|
|
+ c, err := net.Dial("tcp", kafkaAddr)
|
|
|
+ if err == nil {
|
|
|
+ kafkaIsAvailable = true
|
|
|
+ c.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+ kafkaShouldBeAvailable = (env("CI", "") != "")
|
|
|
+}
|
|
|
+
|
|
|
+func checkKafkaAvailability(t *testing.T) {
|
|
|
+ if !kafkaIsAvailable {
|
|
|
+ if kafkaShouldBeAvailable {
|
|
|
+ t.Fatal("Kafka broker is not available")
|
|
|
+ } else {
|
|
|
+ t.Skip("Kafka broker is not available")
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestProducingMessages(t *testing.T) {
|
|
|
- client, err := NewClient("functional_test", []string{"localhost:9092"}, nil)
|
|
|
+ checkKafkaAvailability(t)
|
|
|
+
|
|
|
+ client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|