Browse Source

add GetCurrentOffset to client

Nick Evans 11 years ago
parent
commit
6352c9917d
2 changed files with 51 additions and 35 deletions
  1. 30 0
      client.go
  2. 21 35
      consumer.go

+ 30 - 0
client.go

@@ -165,6 +165,36 @@ func (client *Client) RefreshAllMetadata() error {
 	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
 }
 
+// GetOffset queries the cluster to get the most recent available offset at the given
+// time on the topic/partition combination.
+func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
+	broker, err := client.Leader(topic, partitionID)
+	if err != nil {
+		return -1, err
+	}
+
+	request := &OffsetRequest{}
+	request.AddBlock(topic, partitionID, where, 1)
+
+	response, err := broker.GetAvailableOffsets(client.id, request)
+	if err != nil {
+		return -1, err
+	}
+
+	block := response.GetBlock(topic, partitionID)
+	if block == nil {
+		return -1, IncompleteResponse
+	}
+	if block.Err != NoError {
+		return -1, block.Err
+	}
+	if len(block.Offsets) != 1 {
+		return -1, IncompleteResponse
+	}
+
+	return block.Offsets[0], nil
+}
+
 // misc private helper functions
 
 // XXX: see https://github.com/Shopify/sarama/issues/15

+ 21 - 35
consumer.go

@@ -269,10 +269,8 @@ func (c *Consumer) fetchMessages() {
 }
 
 func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
-	request := &OffsetRequest{}
-	request.AddBlock(c.topic, c.partition, where, 1)
+	offset, err := c.client.GetOffset(c.topic, c.partition, where)
 
-	response, err := c.broker.GetAvailableOffsets(c.client.id, request)
 	switch err {
 	case nil:
 		break
@@ -282,42 +280,30 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 		if !retry {
 			return -1, err
 		}
-		Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
-		c.client.disconnectBroker(c.broker)
-		c.broker, err = c.client.Leader(c.topic, c.partition)
-		if err != nil {
-			return -1, err
-		}
-		return c.getOffset(where, false)
-	}
 
-	block := response.GetBlock(c.topic, c.partition)
-	if block == nil {
-		return -1, IncompleteResponse
-	}
+		switch err.(type) {
+		case KError:
+			switch err {
+			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+				err = c.client.RefreshTopicMetadata(c.topic)
+				if err != nil {
+					return -1, err
+				}
+			default:
+				Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
+				c.client.disconnectBroker(c.broker)
 
-	switch block.Err {
-	case NoError:
-		if len(block.Offsets) < 1 {
-			return -1, IncompleteResponse
-		}
-		return block.Offsets[0], nil
-	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-		if !retry {
-			return -1, block.Err
-		}
-		err = c.client.RefreshTopicMetadata(c.topic)
-		if err != nil {
-			return -1, err
-		}
-		c.broker, err = c.client.Leader(c.topic, c.partition)
-		if err != nil {
-			return -1, err
+				broker, brokerErr := c.client.Leader(c.topic, c.partition)
+				if brokerErr != nil {
+					return -1, brokerErr
+				}
+				c.broker = broker
+			}
+			return c.getOffset(where, false)
 		}
-		return c.getOffset(where, false)
+		return -1, err
 	}
-
-	return -1, block.Err
+	return offset, nil
 }
 
 // Creates a ConsumerConfig instance with sane defaults.