Kaynağa Gözat

Add Replicas() and ReplicasInSync() to client

Switch the 'leader' hash to store the entire PartitionMetadata instead of just
the leaderID. After that quite straightforward.
Evan Huus 11 yıl önce
ebeveyn
işleme
bb546b5ff2
2 değiştirilmiş dosya ile 77 ekleme ve 16 silme
  1. 63 15
      client.go
  2. 14 1
      utils.go

+ 63 - 15
client.go

@@ -30,9 +30,9 @@ type Client struct {
 	seedBroker      *Broker
 	seedBroker      *Broker
 	deadBrokerAddrs map[string]struct{}
 	deadBrokerAddrs map[string]struct{}
 
 
-	brokers map[int32]*Broker          // maps broker ids to brokers
-	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
-	lock    sync.RWMutex               // protects access to the maps, only one since they're always written together
+	brokers  map[int32]*Broker                       // maps broker ids to brokers
+	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
+	lock     sync.RWMutex                            // protects access to the maps, only one since they're always written together
 }
 }
 
 
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
@@ -61,7 +61,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		seedBroker:      NewBroker(addrs[0]),
 		seedBroker:      NewBroker(addrs[0]),
 		deadBrokerAddrs: make(map[string]struct{}),
 		deadBrokerAddrs: make(map[string]struct{}),
 		brokers:         make(map[int32]*Broker),
 		brokers:         make(map[int32]*Broker),
-		leaders:         make(map[string]map[int32]int32),
+		metadata:        make(map[string]map[int32]*PartitionMetadata),
 	}
 	}
 	_ = client.seedBroker.Open(config.DefaultBrokerConf)
 	_ = client.seedBroker.Open(config.DefaultBrokerConf)
 
 
@@ -104,7 +104,7 @@ func (client *Client) Close() error {
 		safeAsyncClose(broker)
 		safeAsyncClose(broker)
 	}
 	}
 	client.brokers = nil
 	client.brokers = nil
-	client.leaders = nil
+	client.metadata = nil
 
 
 	if client.seedBroker != nil {
 	if client.seedBroker != nil {
 		safeAsyncClose(client.seedBroker)
 		safeAsyncClose(client.seedBroker)
@@ -155,14 +155,50 @@ func (client *Client) Topics() ([]string, error) {
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
-	ret := make([]string, 0, len(client.leaders))
-	for topic := range client.leaders {
+	ret := make([]string, 0, len(client.metadata))
+	for topic := range client.metadata {
 		ret = append(ret, topic)
 		ret = append(ret, topic)
 	}
 	}
 
 
 	return ret, nil
 	return ret, nil
 }
 }
 
 
+func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
+	metadata := client.cachedMetadata(topic, partitionID)
+
+	if metadata == nil {
+		err := client.RefreshTopicMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		metadata = client.cachedMetadata(topic, partitionID)
+	}
+
+	if metadata == nil {
+		return nil, UnknownTopicOrPartition
+	}
+
+	return dupeAndSort(metadata.Replicas), nil
+}
+
+func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
+	metadata := client.cachedMetadata(topic, partitionID)
+
+	if metadata == nil {
+		err := client.RefreshTopicMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		metadata = client.cachedMetadata(topic, partitionID)
+	}
+
+	if metadata == nil {
+		return nil, UnknownTopicOrPartition
+	}
+
+	return dupeAndSort(metadata.Isr), nil
+}
+
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // determined by querying the cluster metadata.
 // determined by querying the cluster metadata.
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
@@ -352,22 +388,34 @@ func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
-	partitions := client.leaders[topic]
+	partitions := client.metadata[topic]
 	if partitions != nil {
 	if partitions != nil {
-		leader, ok := partitions[partitionID]
+		metadata, ok := partitions[partitionID]
 		if ok {
 		if ok {
-			return client.brokers[leader]
+			return client.brokers[metadata.Leader]
 		}
 		}
 	}
 	}
 
 
 	return nil
 	return nil
 }
 }
 
 
+func (client *Client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
+	partitions := client.metadata[topic]
+	if partitions != nil {
+		return partitions[partitionID]
+	}
+
+	return nil
+}
+
 func (client *Client) cachedPartitions(topic string) []int32 {
 func (client *Client) cachedPartitions(topic string) []int32 {
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
-	partitions := client.leaders[topic]
+	partitions := client.metadata[topic]
 	if partitions == nil {
 	if partitions == nil {
 		return nil
 		return nil
 	}
 	}
@@ -436,22 +484,22 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		default:
 		default:
 			return nil, topic.Err
 			return nil, topic.Err
 		}
 		}
-		client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
+		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
 			switch partition.Err {
 			case LeaderNotAvailable:
 			case LeaderNotAvailable:
 				toRetry[topic.Name] = true
 				toRetry[topic.Name] = true
-				delete(client.leaders[topic.Name], partition.ID)
+				delete(client.metadata[topic.Name], partition.ID)
 			case NoError:
 			case NoError:
 				broker := client.brokers[partition.Leader]
 				broker := client.brokers[partition.Leader]
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 					if connected, _ := broker.Connected(); !connected {
 					if connected, _ := broker.Connected(); !connected {
 						toRetry[topic.Name] = true
 						toRetry[topic.Name] = true
-						delete(client.leaders[topic.Name], partition.ID)
+						delete(client.metadata[topic.Name], partition.ID)
 						continue
 						continue
 					}
 					}
 				}
 				}
-				client.leaders[topic.Name][partition.ID] = partition.Leader
+				client.metadata[topic.Name][partition.ID] = partition
 			default:
 			default:
 				return nil, partition.Err
 				return nil, partition.Err
 			}
 			}

+ 14 - 1
utils.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "io"
+import (
+	"io"
+	"sort"
+)
 
 
 // make []int32 sortable so we can sort partition numbers
 // make []int32 sortable so we can sort partition numbers
 type int32Slice []int32
 type int32Slice []int32
@@ -17,6 +20,16 @@ func (slice int32Slice) Swap(i, j int) {
 	slice[i], slice[j] = slice[j], slice[i]
 	slice[i], slice[j] = slice[j], slice[i]
 }
 }
 
 
+func dupeAndSort(input []int32) []int32 {
+	ret := make([]int32, 0, len(input))
+	for _, val := range input {
+		ret = append(ret, val)
+	}
+
+	sort.Sort(int32Slice(ret))
+	return ret
+}
+
 func withRecover(fn func()) {
 func withRecover(fn func()) {
 	defer func() {
 	defer func() {
 		handler := PanicHandler
 		handler := PanicHandler