| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- package sarama
- import (
- "fmt"
- "net"
- "os"
- "testing"
- "time"
- )
- const (
- TestBatchSize = 1000
- )
- var (
- kafkaIsAvailable, kafkaShouldBeAvailable bool
- kafkaAddr string
- )
- func init() {
- kafkaAddr = os.Getenv("KAFKA_ADDR")
- if kafkaAddr == "" {
- kafkaAddr = "localhost:9092"
- }
- c, err := net.Dial("tcp", kafkaAddr)
- if err == nil {
- kafkaIsAvailable = true
- c.Close()
- }
- kafkaShouldBeAvailable = os.Getenv("CI") != ""
- }
- func checkKafkaAvailability(t *testing.T) {
- if !kafkaIsAvailable {
- if kafkaShouldBeAvailable {
- t.Fatalf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
- } else {
- t.Skipf("Kafka broker is not available on %s. Set KAFKA_ADDR to connect to Kafka on a different location.", kafkaAddr)
- }
- }
- }
- func TestProducingMessages(t *testing.T) {
- checkKafkaAvailability(t)
- client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- consumerConfig := NewConsumerConfig()
- consumerConfig.OffsetMethod = OffsetMethodNewest
- consumer, err := NewConsumer(client, "single_partition", 0, "functional_test", consumerConfig)
- if err != nil {
- t.Fatal(err)
- }
- defer consumer.Close()
- producerConfig := NewProducerConfig()
- producerConfig.Partitioner = &ConstantPartitioner{Constant: 0}
- producer, err := NewProducer(client, producerConfig)
- if err != nil {
- t.Fatal(err)
- }
- for i := 1; i <= TestBatchSize; i++ {
- err = producer.SendMessage("single_partition", nil, StringEncoder(fmt.Sprintf("testing %d", i)))
- if err != nil {
- t.Fatal(err)
- }
- }
- producer.Close()
- events := consumer.Events()
- for i := 1; i <= TestBatchSize; i++ {
- select {
- case <-time.After(10 * time.Second):
- t.Fatal("Not received any more events in the last 10 seconds.")
- case event := <-events:
- if string(event.Value) != fmt.Sprintf("testing %d", i) {
- t.Fatalf("Unexpected message with index %d: %s", i, event.Value)
- }
- }
- }
- }
|