Evan Huus 10 years ago
parent
commit
b15933a415
4 changed files with 38 additions and 14 deletions
  1. 24 2
      client_test.go
  2. 4 4
      consumer_test.go
  3. 3 1
      metadata_response.go
  4. 7 7
      producer_test.go

+ 24 - 2
client_test.go

@@ -55,10 +55,12 @@ func TestClientMetadata(t *testing.T) {
 
 	mb1 := NewMockBroker(t, 1)
 	mb5 := NewMockBroker(t, 5)
+	replicas := []int32{3, 1, 5}
+	isr := []int32{5, 1}
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID())
+	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr)
 	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -89,6 +91,26 @@ func TestClientMetadata(t *testing.T) {
 	} else if tst.ID() != 5 {
 		t.Error("Leader for my_topic had incorrect ID.")
 	}
+
+	replicas, err = client.Replicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if replicas[0] != 1 {
+		t.Error("Incorrect (or unsorted) replica")
+	} else if replicas[1] != 3 {
+		t.Error("Incorrect (or unsorted) replica")
+	} else if replicas[2] != 5 {
+		t.Error("Incorrect (or unsorted) replica")
+	}
+
+	isr, err = client.ReplicasInSync("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if isr[0] != 1 {
+		t.Error("Incorrect (or unsorted) isr")
+	} else if isr[1] != 5 {
+		t.Error("Incorrect (or unsorted) isr")
+	}
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
@@ -100,7 +122,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mb1.Returns(mdr)
 
 	mdr2 := new(MetadataResponse)
-	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID())
+	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil)
 	mb5.Returns(mdr2)
 
 	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)

+ 4 - 4
consumer_test.go

@@ -19,7 +19,7 @@ func TestSimpleConsumer(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	for i := 0; i < 10; i++ {
@@ -62,7 +62,7 @@ func TestConsumerRawOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -95,7 +95,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	or := new(OffsetResponse)
@@ -130,7 +130,7 @@ func TestConsumerPrelude(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	fr := new(FetchResponse)

+ 3 - 1
metadata_response.go

@@ -182,7 +182,7 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 }
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32) {
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32) {
 	var match *TopicMetadata
 
 	for _, tm := range m.Topics {
@@ -214,5 +214,7 @@ foundTopic:
 foundPartition:
 
 	pmatch.Leader = brokerID
+	pmatch.Replicas = replicas
+	pmatch.Isr = isr
 
 }

+ 7 - 7
producer_test.go

@@ -22,7 +22,7 @@ func TestSimpleProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -59,7 +59,7 @@ func TestProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -101,7 +101,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -150,8 +150,8 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -197,7 +197,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
@@ -223,7 +223,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
+	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil)
 	broker2.Returns(response3)
 
 	response4 := new(ProduceResponse)