Преглед на файлове

More refactor, consistent API

Evan Huus преди 12 години
родител
ревизия
f87efdac57
променени са 3 файла, в които са добавени 23 реда и са изтрити 4 реда
  1. 19 0
      client.go
  2. 3 3
      metadata_cache.go
  3. 1 1
      producer.go

+ 19 - 0
client.go

@@ -33,3 +33,22 @@ func (client *Client) Leader(topic string, partition_id int32) (*broker, error)
 
 	return leader, nil
 }
+
+func (client *Client) Partitions(topic string) ([]int32, error) {
+	partitions := client.cache.partitions(topic)
+
+	if partitions == nil {
+		err := client.cache.refreshTopic(topic)
+		if err != nil {
+			return nil, err
+		}
+
+		partitions = client.cache.partitions(topic)
+	}
+
+	if partitions == nil {
+		return nil, UNKNOWN_TOPIC_OR_PARTITION
+	}
+
+	return partitions, nil
+}

+ 3 - 3
metadata_cache.go

@@ -60,13 +60,13 @@ func (mc *metadataCache) any() *broker {
 	return nil
 }
 
-func (mc *metadataCache) partitions(topic string) ([]int32, error) {
+func (mc *metadataCache) partitions(topic string) []int32 {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
 
 	partitions := mc.leaders[topic]
 	if partitions == nil {
-		return nil, UNKNOWN_TOPIC_OR_PARTITION
+		return nil
 	}
 
 	ret := make([]int32, len(partitions))
@@ -74,7 +74,7 @@ func (mc *metadataCache) partitions(topic string) ([]int32, error) {
 		ret = append(ret, id)
 	}
 
-	return ret, nil
+	return ret
 }
 
 func (mc *metadataCache) refreshTopics(topics []*string) error {

+ 1 - 1
producer.go

@@ -17,7 +17,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 }
 
 func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
-	partitions, err := p.cache.partitions(p.topic)
+	partitions, err := p.Partitions(p.topic)
 	if err != nil {
 		return nil, err
 	}