Forráskód Böngészése

Add error handling for empty topics

Simon Eskildsen 12 éve
szülő
commit
3f1ee04b51
3 módosított fájl, 16 hozzáadás és 0 törlés
  1. 8 0
      client.go
  2. 4 0
      consumer.go
  3. 4 0
      producer.go

+ 8 - 0
client.go

@@ -94,6 +94,10 @@ func (client *Client) Close() error {
 
 // Partitions returns the sorted list of available partition IDs for the given topic.
 func (client *Client) Partitions(topic string) ([]int32, error) {
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	partitions := client.cachedPartitions(topic)
 
 	if partitions == nil {
@@ -127,6 +131,10 @@ func (client *Client) Topics() ([]string, error) {
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // determined by querying the cluster metadata.
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	leader := client.cachedLeader(topic, partitionID)
 
 	if leader == nil {

+ 4 - 0
consumer.go

@@ -96,6 +96,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid EventBufferSize")
 	}
 
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	broker, err := client.Leader(topic, partition)
 	if err != nil {
 		return nil, err

+ 4 - 0
producer.go

@@ -35,6 +35,10 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 		config.Partitioner = NewRandomPartitioner()
 	}
 
+	if topic == "" {
+		return nil, ConfigurationError("Empty topic")
+	}
+
 	p := new(Producer)
 	p.client = client
 	p.topic = topic