Prechádzať zdrojové kódy

Merge pull request #520 from Shopify/offset-manager-initial

Offset manager: make initial offset configurable
Willem van Bergen 9 rokov pred
rodič
commit
4c0d6fcb35
5 zmenil súbory, kde vykonal 132 pridanie a 30 odobranie
  1. 8 0
      config.go
  2. 51 0
      functional_offset_manager_test.go
  3. 1 1
      functional_test.go
  4. 34 13
      offset_manager.go
  5. 38 16
      offset_manager_test.go

+ 8 - 0
config.go

@@ -132,6 +132,10 @@ type Config struct {
 		Offsets struct {
 			// How frequently to commit updated offsets. Defaults to 10s.
 			CommitInterval time.Duration
+
+			// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
+			// Defaults to OffsetNewest.
+			Initial int64
 		}
 	}
 
@@ -172,6 +176,7 @@ func NewConfig() *Config {
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 10 * time.Second
+	c.Consumer.Offsets.Initial = OffsetNewest
 
 	c.ChannelBufferSize = 256
 
@@ -273,6 +278,9 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
 	case c.Consumer.Offsets.CommitInterval <= 0:
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
+	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
+		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
+
 	}
 
 	// validate misc shared values

+ 51 - 0
functional_offset_manager_test.go

@@ -0,0 +1,51 @@
+package sarama
+
+import (
+	"testing"
+)
+
+func TestFuncOffsetManager(t *testing.T) {
+	checkKafkaVersion(t, "0.8.2")
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	client, err := NewClient(kafkaBrokers, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
+		t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
+	}
+
+	pom1, err := offsetManager.ManagePartition("test.1", 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pom1.MarkOffset(10, "test metadata")
+	safeClose(t, pom1)
+
+	pom2, err := offsetManager.ManagePartition("test.1", 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	offset, metadata := pom2.NextOffset()
+
+	if offset != 10+1 {
+		t.Errorf("Expected the next offset to be 11, found %d.", offset)
+	}
+	if metadata != "test metadata" {
+		t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
+	}
+
+	safeClose(t, pom2)
+	safeClose(t, offsetManager)
+	safeClose(t, client)
+}

+ 1 - 1
functional_test.go

@@ -75,7 +75,7 @@ func checkKafkaAvailability(t testing.TB) {
 func checkKafkaVersion(t testing.TB, requiredVersion string) {
 	kafkaVersion := os.Getenv("KAFKA_VERSION")
 	if kafkaVersion == "" {
-		t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
+		t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
 	} else {
 		available := parseKafkaVersion(kafkaVersion)
 		required := parseKafkaVersion(requiredVersion)

+ 34 - 13
offset_manager.go

@@ -111,33 +111,49 @@ func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
 	delete(om.boms, bom.broker)
 }
 
+func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	delete(om.poms[pom.topic], pom.partition)
+	if len(om.poms[pom.topic]) == 0 {
+		delete(om.poms, pom.topic)
+	}
+}
+
 // Partition Offset Manager
 
 // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
 // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
 // out of scope.
 type PartitionOffsetManager interface {
-	// Offset returns the last offset that was marked as processed and associated metadata according to the manager;
-	// this value has not necessarily been flushed to the cluster yet. If you want to resume a partition consumer
-	// from where it left off, remember that you have to increment the offset by one so the partition consumer will
-	// start at the next message. This prevents the last committed message from being processed twice.
-	Offset() (int64, string)
-
-	// SetOffset sets the offset and metadata according to the manager; this value (or a subsequent update)
-	// will eventually be flushed to the cluster based on configuration. You should only set the offset of
-	// messages that have been completely processed.
-	SetOffset(offset int64, metadata string)
+	// NextOffset returns the next offset that should be consumed for the managed partition, accompanied
+	// by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
+	// NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
+	// offset was committed for this partition yet.
+	NextOffset() (int64, string)
+
+	// MarkOffset marks the provided offset as processed, alongside a metadata string that represents
+	// the state of the partition consumer at that point in time. The metadata string can be used by
+	// another consumer to restore that state, so it can resume consumption.
+	//
+	// Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
+	// for efficiency reasons, and it may never be committed if your application crashes. This means that
+	// you may end up processing the same message twice, and your processing should ideally be idempotent.
+	MarkOffset(offset int64, metadata string)
 
 	// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
 	// errors are logged and not returned over this channel. If you want to implement any custom error
 	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
 	Errors() <-chan *ConsumerError
+
 	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
 	// after which you should wait until the 'errors' channel has been drained and closed.
 	// It is required to call this function, or Close before a consumer object passes out of scope,
 	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
 	// client.
 	AsyncClose()
+
 	// Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
 	// (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
 	// leak memory. You must call this before calling Close on the underlying client.
@@ -205,6 +221,7 @@ func (pom *partitionOffsetManager) mainLoop() {
 				}
 				pom.parent.unrefBrokerOffsetManager(pom.broker)
 			}
+			pom.parent.abandonPartitionOffsetManager(pom)
 			close(pom.errors)
 			return
 		}
@@ -290,7 +307,7 @@ func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
 	return pom.errors
 }
 
-func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
+func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
 	pom.lock.Lock()
 	defer pom.lock.Unlock()
 
@@ -313,11 +330,15 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
 	}
 }
 
-func (pom *partitionOffsetManager) Offset() (int64, string) {
+func (pom *partitionOffsetManager) NextOffset() (int64, string) {
 	pom.lock.Lock()
 	defer pom.lock.Unlock()
 
-	return pom.offset, pom.metadata
+	if pom.offset >= 0 {
+		return pom.offset + 1, pom.metadata
+	} else {
+		return pom.parent.conf.Consumer.Offsets.Initial, ""
+	}
 }
 
 func (pom *partitionOffsetManager) AsyncClose() {

+ 38 - 16
offset_manager_test.go

@@ -42,13 +42,13 @@ func initOffsetManager(t *testing.T) (om OffsetManager,
 }
 
 func initPartitionOffsetManager(t *testing.T, om OffsetManager,
-	coordinator *mockBroker) PartitionOffsetManager {
+	coordinator *mockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
 
 	fetchResponse := new(OffsetFetchResponse)
 	fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
 		Err:      ErrNoError,
-		Offset:   5,
-		Metadata: "test_meta",
+		Offset:   initialOffset,
+		Metadata: metadata,
 	})
 	coordinator.Returns(fetchResponse)
 
@@ -162,12 +162,34 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 	safeClose(t, testClient)
 }
 
-func TestPartitionOffsetManagerOffset(t *testing.T) {
+func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	testClient.Config().Consumer.Offsets.Initial = OffsetOldest
+
+	// Kafka returns -1 if no offset has been stored for this partition yet.
+	pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
+
+	offset, meta := pom.NextOffset()
+	if offset != OffsetOldest {
+		t.Errorf("Expected offset 5. Actual: %v", offset)
+	}
+	if meta != "" {
+		t.Errorf("Expected metadata to be empty. Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	broker.Close()
+	coordinator.Close()
+	safeClose(t, testClient)
+}
+
+func TestPartitionOffsetManagerNextOffset(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
-	pom := initPartitionOffsetManager(t, om, coordinator)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
 
-	offset, meta := pom.Offset()
-	if offset != 5 {
+	offset, meta := pom.NextOffset()
+	if offset != 6 {
 		t.Errorf("Expected offset 5. Actual: %v", offset)
 	}
 	if meta != "test_meta" {
@@ -181,18 +203,18 @@ func TestPartitionOffsetManagerOffset(t *testing.T) {
 	safeClose(t, testClient)
 }
 
-func TestPartitionOffsetManagerSetOffset(t *testing.T) {
+func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
-	pom := initPartitionOffsetManager(t, om, coordinator)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse.AddError("my_topic", 0, ErrNoError)
 	coordinator.Returns(ocResponse)
 
-	pom.SetOffset(100, "modified_meta")
-	offset, meta := pom.Offset()
+	pom.MarkOffset(100, "modified_meta")
+	offset, meta := pom.NextOffset()
 
-	if offset != 100 {
+	if offset != 101 {
 		t.Errorf("Expected offset 100. Actual: %v", offset)
 	}
 	if meta != "modified_meta" {
@@ -208,7 +230,7 @@ func TestPartitionOffsetManagerSetOffset(t *testing.T) {
 
 func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
-	pom := initPartitionOffsetManager(t, om, coordinator)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 
 	// Error on one partition
 	ocResponse := new(OffsetCommitResponse)
@@ -265,7 +287,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	ocResponse5.AddError("my_topic", 0, ErrNoError)
 	newCoordinator.Returns(ocResponse5)
 
-	pom.SetOffset(100, "modified_meta")
+	pom.MarkOffset(100, "modified_meta")
 
 	err := pom.Close()
 	if err != nil {
@@ -282,7 +304,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 // Test of recovery from abort
 func TestAbortPartitionOffsetManager(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
-	pom := initPartitionOffsetManager(t, om, coordinator)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
 
 	// this triggers an error in the CommitOffset request,
 	// which leads to the abort call
@@ -300,7 +322,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {
 	ocResponse.AddError("my_topic", 0, ErrNoError)
 	newCoordinator.Returns(ocResponse)
 
-	pom.SetOffset(100, "modified_meta")
+	pom.MarkOffset(100, "modified_meta")
 
 	safeClose(t, pom)
 	safeClose(t, om)