Explorar el Código

Merge pull request #348 from Shopify/drop_offsettime

Remove OffsetTime type, just use int64.
Willem van Bergen hace 10 años
padre
commit
6877c6e9a2
Se han modificado 3 ficheros con 22 adiciones y 33 borrados
  1. 16 4
      client.go
  2. 4 15
      consumer.go
  3. 2 14
      offset_request.go

+ 16 - 4
client.go

@@ -38,8 +38,9 @@ type Client interface {
 	RefreshMetadata(topics ...string) error
 
 	// GetOffset queries the cluster to get the most recent available offset at the given
-	// time on the topic/partition combination.
-	GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)
+	// time on the topic/partition combination. Time should be OffsetOldest for the earliest available
+	// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
+	GetOffset(topic string, partitionID int32, time int64) (int64, error)
 
 	// Close shuts down all broker connections managed by this client. It is required to call this function before
 	// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
@@ -50,6 +51,17 @@ type Client interface {
 	Closed() bool
 }
 
+const (
+	// OffsetNewest stands for the log head offset, i.e. the offset that will be assigned to the next message
+	// that will be produced to the partition. You can send this to a client's GetOffset method to get this
+	// offset, or when calling ConsumePartition to start consuming new messages.
+	OffsetNewest int64 = -1
+	// OffsetOldest stands for the oldest offset available on the broker for a partition. You can send this
+	// to a client's GetOffset method to get this offset, or when calling ConsumePartition to start consuming
+	// from the oldest offset that is still available on the broker.
+	OffsetOldest int64 = -2
+)
+
 type client struct {
 	conf   *Config
 	closer chan none
@@ -259,14 +271,14 @@ func (client *client) RefreshMetadata(topics ...string) error {
 	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
 }
 
-func (client *client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
+func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
 	broker, err := client.Leader(topic, partitionID)
 	if err != nil {
 		return -1, err
 	}
 
 	request := &OffsetRequest{}
-	request.AddBlock(topic, partitionID, where, 1)
+	request.AddBlock(topic, partitionID, time, 1)
 
 	response, err := broker.GetAvailableOffsets(request)
 	if err != nil {

+ 4 - 15
consumer.go

@@ -97,15 +97,6 @@ func (c *consumer) Close() error {
 	return nil
 }
 
-const (
-	// OffsetNewest causes the consumer to start at the most recent available offset, as
-	// determined by querying the broker.
-	OffsetNewest int64 = -1
-	// OffsetOldest causes the consumer to start at the oldest available offset, as
-	// determined by querying the broker.
-	OffsetOldest int64 = -2
-)
-
 func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
 	child := &partitionConsumer{
 		consumer:  c,
@@ -314,13 +305,11 @@ func (child *partitionConsumer) dispatch() error {
 }
 
 func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
-	var where OffsetTime
+	var time int64
 
 	switch offset {
-	case OffsetNewest:
-		where = LatestOffsets
-	case OffsetOldest:
-		where = EarliestOffset
+	case OffsetNewest, OffsetOldest:
+		time = offset
 	default:
 		if offset < 0 {
 			return ConfigurationError("Invalid offset")
@@ -329,7 +318,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
 		return nil
 	}
 
-	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)
+	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
 	return err
 }
 

+ 2 - 14
offset_request.go

@@ -1,19 +1,7 @@
 package sarama
 
-// OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
-// value will be interpreted as milliseconds, or use the special constants defined here.
-type OffsetTime int64
-
-const (
-	// LatestOffsets askes for the latest offsets.
-	LatestOffsets OffsetTime = -1
-	// EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
-	// asking for the earliest offset will always return you a single element.
-	EarliestOffset OffsetTime = -2
-)
-
 type offsetRequestBlock struct {
-	time       OffsetTime
+	time       int64
 	maxOffsets int32
 }
 
@@ -61,7 +49,7 @@ func (r *OffsetRequest) version() int16 {
 	return 0
 }
 
-func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32) {
+func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 	}