|
|
@@ -58,6 +58,48 @@ func TestFuncConnectionFailure(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestFuncClientMetadata(t *testing.T) {
|
|
|
+ checkKafkaAvailability(t)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Metadata.Retry.Max = 1
|
|
|
+ config.Metadata.Retry.Backoff = 10 * time.Millisecond
|
|
|
+ client, err := NewClient(kafkaBrokers, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ partitions, err := client.Partitions("multi_partition")
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ if len(partitions) != 2 {
|
|
|
+ t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
|
|
|
+ }
|
|
|
+
|
|
|
+ partitions, err = client.Partitions("single_partition")
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ if len(partitions) != 1 {
|
|
|
+ t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
|
|
|
+ }
|
|
|
+
|
|
|
+ safeClose(t, client)
|
|
|
+}
|
|
|
+
|
|
|
func TestFuncProducing(t *testing.T) {
|
|
|
config := NewConfig()
|
|
|
testProducingMessages(t, config)
|
|
|
@@ -124,6 +166,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestProducingToInvalidTopic(t *testing.T) {
|
|
|
+ checkKafkaAvailability(t)
|
|
|
+
|
|
|
+ producer, err := NewSyncProducer(kafkaBrokers, nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, found", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, found", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+}
|
|
|
+
|
|
|
func testProducingMessages(t *testing.T, config *Config) {
|
|
|
checkKafkaAvailability(t)
|
|
|
|