|
|
@@ -2,6 +2,8 @@ package sarama
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
+ "net"
|
|
|
+ "os"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
@@ -10,8 +12,40 @@ const (
|
|
|
TestBatchSize = 1000
|
|
|
)
|
|
|
|
|
|
+var (
|
|
|
+ kafkaIsAvailable, kafkaShouldBeAvailable bool
|
|
|
+ kafkaAddr string
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ kafkaAddr = os.Getenv("KAFKA_ADDR")
|
|
|
+ if kafkaAddr == "" {
|
|
|
+ kafkaAddr = "localhost:9092"
|
|
|
+ }
|
|
|
+
|
|
|
+ c, err := net.Dial("tcp", kafkaAddr)
|
|
|
+ if err == nil {
|
|
|
+ kafkaIsAvailable = true
|
|
|
+ c.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+ kafkaShouldBeAvailable = os.Getenv("CI") != ""
|
|
|
+}
|
|
|
+
|
|
|
+func checkKafkaAvailability(t *testing.T) {
|
|
|
+ if !kafkaIsAvailable {
|
|
|
+ if kafkaShouldBeAvailable {
|
|
|
+ t.Fatalf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
|
|
|
+ } else {
|
|
|
+ t.Skipf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestProducingMessages(t *testing.T) {
|
|
|
- client, err := NewClient("functional_test", []string{"localhost:9092"}, nil)
|
|
|
+ checkKafkaAvailability(t)
|
|
|
+
|
|
|
+ client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|