Browse Source

Remove OffsetTime in lieu of int64.

Willem van Bergen 10 years ago
parent
commit
a9d8315553
3 changed files with 11 additions and 24 deletions
  1. 5 4
      client.go
  2. 4 6
      consumer.go
  3. 2 14
      offset_request.go

+ 5 - 4
client.go

@@ -43,8 +43,9 @@ type Client interface {
 	RefreshMetadata(topics ...string) error
 
 	// GetOffset queries the cluster to get the most recent available offset at the given
-	// time on the topic/partition combination.
-	GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)
+	// time on the topic/partition combination. Time should be OffsetOldest for the earliest available
+	// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
+	GetOffset(topic string, partitionID int32, time int64) (int64, error)
 
 	// Close shuts down all broker connections managed by this client. It is required to call this function before
 	// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
@@ -281,14 +282,14 @@ func (client *client) RefreshMetadata(topics ...string) error {
 	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
 }
 
-func (client *client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
+func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
 	broker, err := client.Leader(topic, partitionID)
 	if err != nil {
 		return -1, err
 	}
 
 	request := &OffsetRequest{}
-	request.AddBlock(topic, partitionID, where, 1)
+	request.AddBlock(topic, partitionID, time, 1)
 
 	response, err := broker.GetAvailableOffsets(request)
 	if err != nil {

+ 4 - 6
consumer.go

@@ -314,13 +314,11 @@ func (child *partitionConsumer) dispatch() error {
 }
 
 func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
-	var where OffsetTime
+	var time int64
 
 	switch offset {
-	case OffsetNewest:
-		where = LatestOffsets
-	case OffsetOldest:
-		where = EarliestOffset
+	case OffsetNewest, OffsetOldest:
+		time = offset
 	default:
 		if offset < 0 {
 			return ConfigurationError("Invalid offset")
@@ -329,7 +327,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
 		return nil
 	}
 
-	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)
+	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, time)
 	return err
 }
 

+ 2 - 14
offset_request.go

@@ -1,19 +1,7 @@
 package sarama
 
-// OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
-// value will be interpreted as milliseconds, or use the special constants defined here.
-type OffsetTime int64
-
-const (
-	// LatestOffsets askes for the latest offsets.
-	LatestOffsets OffsetTime = -1
-	// EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
-	// asking for the earliest offset will always return you a single element.
-	EarliestOffset OffsetTime = -2
-)
-
 type offsetRequestBlock struct {
-	time       OffsetTime
+	time       int64
 	maxOffsets int32
 }
 
@@ -61,7 +49,7 @@ func (r *OffsetRequest) version() int16 {
 	return 0
 }
 
-func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32) {
+func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 	}