functional_test.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. const (
  8. TestBatchSize = 1000
  9. )
  10. func TestProducingMessages(t *testing.T) {
  11. client, err := NewClient("functional_test", []string{"localhost:9092"}, nil)
  12. if err != nil {
  13. t.Fatal(err)
  14. }
  15. defer client.Close()
  16. consumerConfig := NewConsumerConfig()
  17. consumerConfig.OffsetMethod = OffsetMethodNewest
  18. consumer, err := NewConsumer(client, "single_partition", 0, "functional_test", consumerConfig)
  19. if err != nil {
  20. t.Fatal(err)
  21. }
  22. defer consumer.Close()
  23. producer, err := NewProducer(client, nil)
  24. if err != nil {
  25. t.Fatal(err)
  26. }
  27. defer producer.Close()
  28. for i := 1; i <= TestBatchSize; i++ {
  29. err = producer.SendMessage("single_partition", nil, StringEncoder(fmt.Sprintf("testing %d", i)))
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. }
  34. events := consumer.Events()
  35. for i := 1; i <= TestBatchSize; i++ {
  36. select {
  37. case <-time.After(10 * time.Second):
  38. t.Fatal("Not received any more events in the last 10 seconds.")
  39. case event := <-events:
  40. if string(event.Value) != fmt.Sprintf("testing %d", i) {
  41. t.Fatal("Unexpected message with index %d: %s", i, event.Value)
  42. }
  43. }
  44. }
  45. }