functional_test.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package sarama
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "testing"
  7. "time"
  8. )
  9. const (
  10. TestBatchSize = 1000
  11. )
  12. var (
  13. kafkaIsAvailable, kafkaShouldBeAvailable bool
  14. kafkaAddr string
  15. )
  16. func init() {
  17. env := func(key, defaultValue string) string {
  18. if value := os.Getenv(key); value != "" {
  19. return value
  20. }
  21. return defaultValue
  22. }
  23. kafkaAddr := env("KAFKA_ADDR", "localhost:9092")
  24. c, err := net.Dial("tcp", kafkaAddr)
  25. if err == nil {
  26. kafkaIsAvailable = true
  27. c.Close()
  28. }
  29. kafkaShouldBeAvailable = (env("CI", "") != "")
  30. }
  31. func checkKafkaAvailability(t *testing.T) {
  32. if !kafkaIsAvailable {
  33. if kafkaShouldBeAvailable {
  34. t.Fatal("Kafka broker is not available")
  35. } else {
  36. t.Skip("Kafka broker is not available")
  37. }
  38. }
  39. }
  40. func TestProducingMessages(t *testing.T) {
  41. checkKafkaAvailability(t)
  42. client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
  43. if err != nil {
  44. t.Fatal(err)
  45. }
  46. defer client.Close()
  47. consumerConfig := NewConsumerConfig()
  48. consumerConfig.OffsetMethod = OffsetMethodNewest
  49. consumer, err := NewConsumer(client, "single_partition", 0, "functional_test", consumerConfig)
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. defer consumer.Close()
  54. producer, err := NewProducer(client, nil)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. defer producer.Close()
  59. for i := 1; i <= TestBatchSize; i++ {
  60. err = producer.SendMessage("single_partition", nil, StringEncoder(fmt.Sprintf("testing %d", i)))
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. }
  65. events := consumer.Events()
  66. for i := 1; i <= TestBatchSize; i++ {
  67. select {
  68. case <-time.After(10 * time.Second):
  69. t.Fatal("Not received any more events in the last 10 seconds.")
  70. case event := <-events:
  71. if string(event.Value) != fmt.Sprintf("testing %d", i) {
  72. t.Fatal("Unexpected message with index %d: %s", i, event.Value)
  73. }
  74. }
  75. }
  76. }