|
@@ -4,6 +4,7 @@ import (
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"net"
|
|
"net"
|
|
|
"os"
|
|
"os"
|
|
|
|
|
+ "strings"
|
|
|
"sync"
|
|
"sync"
|
|
|
"testing"
|
|
"testing"
|
|
|
"time"
|
|
"time"
|
|
@@ -11,20 +12,24 @@ import (
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
|
TestBatchSize = 1000
|
|
TestBatchSize = 1000
|
|
|
|
|
+
|
|
|
|
|
+ VagrantKafkaPeers = "192.168.100.67:6667,192.168.100.67:6668,192.168.100.67:6669,192.168.100.67:6670,192.168.100.67:6671"
|
|
|
|
|
+ VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
var (
|
|
|
kafkaIsAvailable, kafkaShouldBeAvailable bool
|
|
kafkaIsAvailable, kafkaShouldBeAvailable bool
|
|
|
- kafkaAddr string
|
|
|
|
|
|
|
+ kafkaBrokers []string
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
|
- kafkaAddr = os.Getenv("KAFKA_ADDR")
|
|
|
|
|
- if kafkaAddr == "" {
|
|
|
|
|
- kafkaAddr = "localhost:6667"
|
|
|
|
|
|
|
+ kafkaPeers := os.Getenv("KAFKA_PEERS")
|
|
|
|
|
+ if kafkaPeers == "" {
|
|
|
|
|
+ kafkaPeers = VagrantKafkaPeers
|
|
|
}
|
|
}
|
|
|
|
|
+ kafkaBrokers = strings.Split(kafkaPeers, ",")
|
|
|
|
|
|
|
|
- if c, err := net.Dial("tcp", kafkaAddr); err == nil {
|
|
|
|
|
|
|
+ if c, err := net.Dial("tcp", kafkaBrokers[0]); err == nil {
|
|
|
if err = c.Close(); err == nil {
|
|
if err = c.Close(); err == nil {
|
|
|
kafkaIsAvailable = true
|
|
kafkaIsAvailable = true
|
|
|
}
|
|
}
|
|
@@ -36,9 +41,9 @@ func init() {
|
|
|
func checkKafkaAvailability(t *testing.T) {
|
|
func checkKafkaAvailability(t *testing.T) {
|
|
|
if !kafkaIsAvailable {
|
|
if !kafkaIsAvailable {
|
|
|
if kafkaShouldBeAvailable {
|
|
if kafkaShouldBeAvailable {
|
|
|
- t.Fatalf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
|
|
|
|
|
|
|
+ t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
|
|
|
} else {
|
|
} else {
|
|
|
- t.Skipf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
|
|
|
|
|
|
|
+ t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -91,7 +96,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
|
|
|
config.Producer.Flush.Frequency = 50 * time.Millisecond
|
|
config.Producer.Flush.Frequency = 50 * time.Millisecond
|
|
|
config.Producer.Flush.Messages = 200
|
|
config.Producer.Flush.Messages = 200
|
|
|
config.Producer.Return.Successes = true
|
|
config.Producer.Return.Successes = true
|
|
|
- producer, err := NewAsyncProducer([]string{kafkaAddr}, config)
|
|
|
|
|
|
|
+ producer, err := NewAsyncProducer(kafkaBrokers, config)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
@@ -125,7 +130,7 @@ func testProducingMessages(t *testing.T, config *Config) {
|
|
|
config.Producer.Return.Successes = true
|
|
config.Producer.Return.Successes = true
|
|
|
config.Consumer.Return.Errors = true
|
|
config.Consumer.Return.Errors = true
|
|
|
|
|
|
|
|
- client, err := NewClient([]string{kafkaAddr}, config)
|
|
|
|
|
|
|
+ client, err := NewClient(kafkaBrokers, config)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|