functional_client_test.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. //+build functional
  2. package sarama
  3. import (
  4. "fmt"
  5. "testing"
  6. "time"
  7. )
  8. func TestFuncConnectionFailure(t *testing.T) {
  9. setupFunctionalTest(t)
  10. defer teardownFunctionalTest(t)
  11. FunctionalTestEnv.Proxies["kafka1"].Enabled = false
  12. SaveProxy(t, "kafka1")
  13. config := NewConfig()
  14. config.Metadata.Retry.Max = 1
  15. _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
  16. if err != ErrOutOfBrokers {
  17. t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
  18. }
  19. }
  20. func TestFuncClientMetadata(t *testing.T) {
  21. setupFunctionalTest(t)
  22. defer teardownFunctionalTest(t)
  23. config := NewConfig()
  24. config.Metadata.Retry.Max = 1
  25. config.Metadata.Retry.Backoff = 10 * time.Millisecond
  26. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
  27. if err != nil {
  28. t.Fatal(err)
  29. }
  30. if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
  31. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  32. }
  33. if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
  34. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  35. }
  36. if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
  37. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  38. }
  39. partitions, err := client.Partitions("test.4")
  40. if err != nil {
  41. t.Error(err)
  42. }
  43. if len(partitions) != 4 {
  44. t.Errorf("Expected test.4 topic to have 4 partitions, found %v", partitions)
  45. }
  46. partitions, err = client.Partitions("test.1")
  47. if err != nil {
  48. t.Error(err)
  49. }
  50. if len(partitions) != 1 {
  51. t.Errorf("Expected test.1 topic to have 1 partitions, found %v", partitions)
  52. }
  53. safeClose(t, client)
  54. }
  55. func TestFuncClientCoordinator(t *testing.T) {
  56. checkKafkaVersion(t, "0.8.2")
  57. setupFunctionalTest(t)
  58. defer teardownFunctionalTest(t)
  59. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. for i := 0; i < 10; i++ {
  64. broker, err := client.Coordinator(fmt.Sprintf("another_new_consumer_group_%d", i))
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. if connected, err := broker.Connected(); !connected || err != nil {
  69. t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr())
  70. }
  71. }
  72. safeClose(t, client)
  73. }