functional_client_test.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func TestFuncConnectionFailure(t *testing.T) {
  8. setupFunctionalTest(t)
  9. defer teardownFunctionalTest(t)
  10. Proxies["kafka1"].Enabled = false
  11. SaveProxy(t, "kafka1")
  12. config := NewConfig()
  13. config.Metadata.Retry.Max = 1
  14. _, err := NewClient([]string{kafkaBrokers[0]}, config)
  15. if err != ErrOutOfBrokers {
  16. t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
  17. }
  18. }
  19. func TestFuncClientMetadata(t *testing.T) {
  20. setupFunctionalTest(t)
  21. defer teardownFunctionalTest(t)
  22. config := NewConfig()
  23. config.Metadata.Retry.Max = 1
  24. config.Metadata.Retry.Backoff = 10 * time.Millisecond
  25. client, err := NewClient(kafkaBrokers, config)
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
  30. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  31. }
  32. if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
  33. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  34. }
  35. if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
  36. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  37. }
  38. partitions, err := client.Partitions("test.4")
  39. if err != nil {
  40. t.Error(err)
  41. }
  42. if len(partitions) != 4 {
  43. t.Errorf("Expected test.4 topic to have 4 partitions, found %v", partitions)
  44. }
  45. partitions, err = client.Partitions("test.1")
  46. if err != nil {
  47. t.Error(err)
  48. }
  49. if len(partitions) != 1 {
  50. t.Errorf("Expected test.1 topic to have 1 partitions, found %v", partitions)
  51. }
  52. safeClose(t, client)
  53. }
  54. func TestFuncClientCoordinator(t *testing.T) {
  55. checkKafkaVersion(t, "0.8.2")
  56. setupFunctionalTest(t)
  57. defer teardownFunctionalTest(t)
  58. client, err := NewClient(kafkaBrokers, nil)
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. for i := 0; i < 10; i++ {
  63. broker, err := client.Coordinator(fmt.Sprintf("another_new_consumer_group_%d", i))
  64. if err != nil {
  65. t.Fatal(err)
  66. }
  67. if connected, err := broker.Connected(); !connected || err != nil {
  68. t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr())
  69. }
  70. }
  71. safeClose(t, client)
  72. }