Ver código fonte

Add retry logic to GetOffset() just like Leader()

Which is: refresh the metadata and retry once, in case e.g. our connection or
metadata is stale.

Also add a test for it.
Evan Huus 10 anos atrás
pai
commit
f922f12eb9
2 arquivos alterados com 84 adições e 21 exclusões
  1. 36 21
      client.go
  2. 48 0
      client_test.go

+ 36 - 21
client.go

@@ -292,31 +292,16 @@ func (client *client) RefreshMetadata(topics ...string) error {
 }
 
 func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
-	broker, err := client.Leader(topic, partitionID)
-	if err != nil {
-		return -1, err
-	}
-
-	request := &OffsetRequest{}
-	request.AddBlock(topic, partitionID, time, 1)
+	offset, err := client.getOffset(topic, partitionID, time)
 
-	response, err := broker.GetAvailableOffsets(request)
 	if err != nil {
-		return -1, err
-	}
-
-	block := response.GetBlock(topic, partitionID)
-	if block == nil {
-		return -1, ErrIncompleteResponse
-	}
-	if block.Err != ErrNoError {
-		return -1, block.Err
-	}
-	if len(block.Offsets) != 1 {
-		return -1, ErrOffsetOutOfRange
+		if err := client.RefreshMetadata(topic); err != nil {
+			return -1, err
+		}
+		return client.getOffset(topic, partitionID, time)
 	}
 
-	return block.Offsets[0], nil
+	return offset, err
 }
 
 // private broker management helpers
@@ -442,6 +427,36 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er
 	return nil, ErrUnknownTopicOrPartition
 }
 
+func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
+	broker, err := client.Leader(topic, partitionID)
+	if err != nil {
+		return -1, err
+	}
+
+	request := &OffsetRequest{}
+	request.AddBlock(topic, partitionID, time, 1)
+
+	response, err := broker.GetAvailableOffsets(request)
+	if err != nil {
+		_ = broker.Close()
+		return -1, err
+	}
+
+	block := response.GetBlock(topic, partitionID)
+	if block == nil {
+		_ = broker.Close()
+		return -1, ErrIncompleteResponse
+	}
+	if block.Err != ErrNoError {
+		return -1, block.Err
+	}
+	if len(block.Offsets) != 1 {
+		return -1, ErrOffsetOutOfRange
+	}
+
+	return block.Offsets[0], nil
+}
+
 // core metadata update logic
 
 func (client *client) backgroundMetadataUpdater() {

+ 48 - 0
client_test.go

@@ -200,6 +200,54 @@ func TestClientMetadata(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientGetOffset(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+	leaderAddr := leader.Addr()
+
+	metadata := new(MetadataResponse)
+	metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadata.AddBroker(leaderAddr, leader.BrokerID())
+	seedBroker.Returns(metadata)
+
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	offsetResponse := new(OffsetResponse)
+	offsetResponse.AddTopicPartition("foo", 0, 123)
+	leader.Returns(offsetResponse)
+
+	offset, err := client.GetOffset("foo", 0, OffsetNewest)
+	if err != nil {
+		t.Error(err)
+	}
+	if offset != 123 {
+		t.Error("Unexpected offset, got ", offset)
+	}
+
+	leader.Close()
+	seedBroker.Returns(metadata)
+
+	leader = newMockBrokerAddr(t, 2, leaderAddr)
+	offsetResponse = new(OffsetResponse)
+	offsetResponse.AddTopicPartition("foo", 0, 456)
+	leader.Returns(offsetResponse)
+
+	offset, err = client.GetOffset("foo", 0, OffsetNewest)
+	if err != nil {
+		t.Error(err)
+	}
+	if offset != 456 {
+		t.Error("Unexpected offset, got ", offset)
+	}
+
+	seedBroker.Close()
+	leader.Close()
+	safeClose(t, client)
+}
+
 func TestClientReceivingUnknownTopic(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)