functional_client_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func TestFuncConnectionFailure(t *testing.T) {
  7. config := NewConfig()
  8. config.Metadata.Retry.Max = 1
  9. _, err := NewClient([]string{"localhost:9000"}, config)
  10. if err != ErrOutOfBrokers {
  11. t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
  12. }
  13. }
  14. func TestFuncClientMetadata(t *testing.T) {
  15. checkKafkaAvailability(t)
  16. config := NewConfig()
  17. config.Metadata.Retry.Max = 1
  18. config.Metadata.Retry.Backoff = 10 * time.Millisecond
  19. client, err := NewClient(kafkaBrokers, config)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
  24. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  25. }
  26. if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
  27. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  28. }
  29. if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
  30. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  31. }
  32. partitions, err := client.Partitions("multi_partition")
  33. if err != nil {
  34. t.Error(err)
  35. }
  36. if len(partitions) != 2 {
  37. t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
  38. }
  39. partitions, err = client.Partitions("single_partition")
  40. if err != nil {
  41. t.Error(err)
  42. }
  43. if len(partitions) != 1 {
  44. t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
  45. }
  46. safeClose(t, client)
  47. }