Procházet zdrojové kódy

Merge pull request #38 from Shopify/empty-topics

Add error handling for empty topics
Simon Hørup Eskildsen před 12 roky
rodič
revize
7cf7edc45d
3 změnil soubory, kde provedl 17 přidání a 0 odebrání
  1. 9 0
      client.go
  2. 4 0
      consumer.go
  3. 4 0
      producer.go

+ 9 - 0
client.go

@@ -184,6 +184,15 @@ func (client *Client) disconnectBroker(broker *Broker) {
 }
 
 func (client *Client) refreshMetadata(topics []string, retries int) error {
+	// Kafka will throw exceptions on an empty topic and not return a proper
+	// error. This handles the case by returning an error instead of sending it
+	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
+	for _, topic := range topics {
+		if len(topic) == 0 {
+			return NoSuchTopic
+		}
+	}
+
 	for broker := client.any(); broker != nil; broker = client.any() {
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
 

+ 4 - 0
consumer.go

@@ -96,6 +96,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid EventBufferSize")
 	}
 
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	broker, err := client.Leader(topic, partition)
 	if err != nil {
 		return nil, err

+ 4 - 0
producer.go

@@ -35,6 +35,10 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 		config.Partitioner = NewRandomPartitioner()
 	}
 
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	p := new(Producer)
 	p.client = client
 	p.topic = topic