|
|
@@ -1,19 +1,16 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
- "fmt"
|
|
|
"log"
|
|
|
"net"
|
|
|
"os"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
- "sync"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
- TestBatchSize = 1000
|
|
|
-
|
|
|
VagrantKafkaPeers = "192.168.100.67:6667,192.168.100.67:6668,192.168.100.67:6669,192.168.100.67:6670,192.168.100.67:6671"
|
|
|
VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
|
|
|
)
|
|
|
@@ -53,205 +50,44 @@ func checkKafkaAvailability(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func TestFuncConnectionFailure(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- config.Metadata.Retry.Max = 1
|
|
|
-
|
|
|
- _, err := NewClient([]string{"localhost:9000"}, config)
|
|
|
- if err != ErrOutOfBrokers {
|
|
|
- t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncClientMetadata(t *testing.T) {
|
|
|
- checkKafkaAvailability(t)
|
|
|
-
|
|
|
- config := NewConfig()
|
|
|
- config.Metadata.Retry.Max = 1
|
|
|
- config.Metadata.Retry.Backoff = 10 * time.Millisecond
|
|
|
- client, err := NewClient(kafkaBrokers, config)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
|
|
|
- t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
- }
|
|
|
-
|
|
|
- if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
|
|
|
- t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
- }
|
|
|
-
|
|
|
- if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
|
|
|
- t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
- }
|
|
|
-
|
|
|
- partitions, err := client.Partitions("multi_partition")
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- if len(partitions) != 2 {
|
|
|
- t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
|
|
|
- }
|
|
|
-
|
|
|
- partitions, err = client.Partitions("single_partition")
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- if len(partitions) != 1 {
|
|
|
- t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
|
|
|
- }
|
|
|
-
|
|
|
- safeClose(t, client)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncProducing(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- testProducingMessages(t, config)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncProducingGzip(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- config.Producer.Compression = CompressionGZIP
|
|
|
- testProducingMessages(t, config)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncProducingSnappy(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- config.Producer.Compression = CompressionSnappy
|
|
|
- testProducingMessages(t, config)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncProducingNoResponse(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- config.Producer.RequiredAcks = NoResponse
|
|
|
- testProducingMessages(t, config)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncProducingFlushing(t *testing.T) {
|
|
|
- config := NewConfig()
|
|
|
- config.Producer.Flush.Messages = TestBatchSize / 8
|
|
|
- config.Producer.Flush.Frequency = 250 * time.Millisecond
|
|
|
- testProducingMessages(t, config)
|
|
|
-}
|
|
|
-
|
|
|
-func TestFuncMultiPartitionProduce(t *testing.T) {
|
|
|
- checkKafkaAvailability(t)
|
|
|
-
|
|
|
- config := NewConfig()
|
|
|
- config.ChannelBufferSize = 20
|
|
|
- config.Producer.Flush.Frequency = 50 * time.Millisecond
|
|
|
- config.Producer.Flush.Messages = 200
|
|
|
- config.Producer.Return.Successes = true
|
|
|
- producer, err := NewAsyncProducer(kafkaBrokers, config)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- var wg sync.WaitGroup
|
|
|
- wg.Add(TestBatchSize)
|
|
|
-
|
|
|
- for i := 1; i <= TestBatchSize; i++ {
|
|
|
-
|
|
|
- go func(i int, w *sync.WaitGroup) {
|
|
|
- defer w.Done()
|
|
|
- msg := &ProducerMessage{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
|
|
|
- producer.Input() <- msg
|
|
|
- select {
|
|
|
- case ret := <-producer.Errors():
|
|
|
- t.Fatal(ret.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- }
|
|
|
- }(i, &wg)
|
|
|
- }
|
|
|
-
|
|
|
- wg.Wait()
|
|
|
- if err := producer.Close(); err != nil {
|
|
|
- t.Error(err)
|
|
|
+func checkKafkaVersion(t *testing.T, requiredVersion string) {
|
|
|
+ kafkaVersion := os.Getenv("KAFKA_VERSION")
|
|
|
+ if kafkaVersion == "" {
|
|
|
+ t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
|
|
|
+ } else {
|
|
|
+ available := parseKafkaVersion(kafkaVersion)
|
|
|
+ required := parseKafkaVersion(requiredVersion)
|
|
|
+ if !available.satisfies(required) {
|
|
|
+ t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func TestProducingToInvalidTopic(t *testing.T) {
|
|
|
- checkKafkaAvailability(t)
|
|
|
+type kafkaVersion []int
|
|
|
|
|
|
- producer, err := NewSyncProducer(kafkaBrokers, nil)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
|
|
|
- t.Error("Expected ErrUnknownTopicOrPartition, found", err)
|
|
|
- }
|
|
|
+func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
|
|
|
+ var ov int
|
|
|
+ for index, v := range kv {
|
|
|
+ if len(other) <= index {
|
|
|
+ ov = 0
|
|
|
+ } else {
|
|
|
+ ov = other[index]
|
|
|
+ }
|
|
|
|
|
|
- if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
|
|
|
- t.Error("Expected ErrUnknownTopicOrPartition, found", err)
|
|
|
+ if v < ov {
|
|
|
+ return false
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- safeClose(t, producer)
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
-func testProducingMessages(t *testing.T, config *Config) {
|
|
|
- checkKafkaAvailability(t)
|
|
|
-
|
|
|
- config.Producer.Return.Successes = true
|
|
|
- config.Consumer.Return.Errors = true
|
|
|
-
|
|
|
- client, err := NewClient(kafkaBrokers, config)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- master, err := NewConsumerFromClient(client)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- consumer, err := master.ConsumePartition("single_partition", 0, OffsetNewest)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- producer, err := NewAsyncProducerFromClient(client)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- expectedResponses := TestBatchSize
|
|
|
- for i := 1; i <= TestBatchSize; {
|
|
|
- msg := &ProducerMessage{Topic: "single_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
|
|
|
- select {
|
|
|
- case producer.Input() <- msg:
|
|
|
- i++
|
|
|
- case ret := <-producer.Errors():
|
|
|
- t.Fatal(ret.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- expectedResponses--
|
|
|
- }
|
|
|
- }
|
|
|
- for expectedResponses > 0 {
|
|
|
- select {
|
|
|
- case ret := <-producer.Errors():
|
|
|
- t.Fatal(ret.Err)
|
|
|
- case <-producer.Successes():
|
|
|
- expectedResponses--
|
|
|
- }
|
|
|
+func parseKafkaVersion(version string) kafkaVersion {
|
|
|
+ numbers := strings.Split(version, ".")
|
|
|
+ result := make(kafkaVersion, 0, len(numbers))
|
|
|
+ for _, number := range numbers {
|
|
|
+ nr, _ := strconv.Atoi(number)
|
|
|
+ result = append(result, nr)
|
|
|
}
|
|
|
- safeClose(t, producer)
|
|
|
-
|
|
|
- for i := 1; i <= TestBatchSize; i++ {
|
|
|
- select {
|
|
|
- case <-time.After(10 * time.Second):
|
|
|
- t.Fatal("Not received any more events in the last 10 seconds.")
|
|
|
|
|
|
- case err := <-consumer.Errors():
|
|
|
- t.Error(err)
|
|
|
-
|
|
|
- case message := <-consumer.Messages():
|
|
|
- if string(message.Value) != fmt.Sprintf("testing %d", i) {
|
|
|
- t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- safeClose(t, consumer)
|
|
|
- safeClose(t, client)
|
|
|
+ return result
|
|
|
}
|