Quellcode durchsuchen

Merge pull request #872 from pd/isrs

Add client.Isr to determine in-sync replicas
Evan Huus vor 8 Jahren
Ursprung
Commit
bdb312ef7a
2 geänderte Dateien mit 41 neuen und 0 gelöschten Zeilen
  1. 30 0
      client.go
  2. 11 0
      client_test.go

+ 30 - 0
client.go

@@ -38,6 +38,11 @@ type Client interface {
 	// Replicas returns the set of all replica IDs for the given partition.
 	Replicas(topic string, partitionID int32) ([]int32, error)
 
+	// InSyncReplicas returns the set of all in-sync replica IDs for the given
+	// partition. In-sync replicas are replicas which are fully caught up with
+	// the partition leader.
+	InSyncReplicas(topic string, partitionID int32) ([]int32, error)
+
 	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
 	// available metadata for those topics. If no topics are provided, it will refresh
 	// metadata for all topics.
@@ -295,6 +300,31 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
 	return dupeAndSort(metadata.Replicas), nil
 }
 
+func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	metadata := client.cachedMetadata(topic, partitionID)
+
+	if metadata == nil {
+		err := client.RefreshMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		metadata = client.cachedMetadata(topic, partitionID)
+	}
+
+	if metadata == nil {
+		return nil, ErrUnknownTopicOrPartition
+	}
+
+	if metadata.Err == ErrReplicaNotAvailable {
+		return nil, metadata.Err
+	}
+	return dupeAndSort(metadata.Isr), nil
+}
+
 func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient

+ 11 - 0
client_test.go

@@ -196,6 +196,17 @@ func TestClientMetadata(t *testing.T) {
 		t.Error("Incorrect (or unsorted) replica")
 	}
 
+	isr, err = client.InSyncReplicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if len(isr) != 2 {
+		t.Error("Client returned incorrect ISRs for partition:", isr)
+	} else if isr[0] != 1 {
+		t.Error("Incorrect (or unsorted) ISR:", isr)
+	} else if isr[1] != 5 {
+		t.Error("Incorrect (or unsorted) ISR:", isr)
+	}
+
 	leader.Close()
 	seedBroker.Close()
 	safeClose(t, client)