瀏覽代碼

Allow the Consumer to disable auto-commit offsets (#1164)

* Shopify/sarama#1158 Allow the Consumer to disable auto-commit offsets

* Ignoring Close if commit offsets are disabled. Avoit starting mainloop if commit offsets are disabled

* Renamed Consumer.Offsets.AutoCommitEnable to Consumer.Offsets.AutoCommit.Enable. Renamed Consumer.Offsets.CommitInteval to Consumer.Offsets.AutoCommit.Inteval

* started on unittest for Consumer.Offsets.AutoCommit.Enable

* Moved check for Consumer.Offsets.AutoCommit.Enable to offsetManager.flushToBroker to keep mainLoop

* moved ticker back to struct

* Fixed TestNewOffsetManagerOffsetsAutoCommit fails because of low timeout
Kjell Tore Fossbakk 6 年之前
父節點
當前提交
72a629d7d1
共有 3 個文件被更改,包括 98 次插入10 次删除
  1. 12 4
      config.go
  2. 6 1
      offset_manager.go
  3. 80 5
      offset_manager_test.go

+ 12 - 4
config.go

@@ -338,8 +338,15 @@ type Config struct {
 		// offsets. This currently requires the manual use of an OffsetManager
 		// but will eventually be automated.
 		Offsets struct {
-			// How frequently to commit updated offsets. Defaults to 1s.
-			CommitInterval time.Duration
+			AutoCommit struct {
+				// Whether or not to auto-commit updated offsets back to the broker.
+				// (default enabled).
+				Enable bool
+
+				// How frequently to commit updated offsets. Ineffective unless
+				// auto-commit is enabled (default 1s)
+				Interval time.Duration
+			}
 
 			// The initial offset to use if no offset was previously committed.
 			// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
@@ -423,7 +430,8 @@ func NewConfig() *Config {
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
-	c.Consumer.Offsets.CommitInterval = 1 * time.Second
+	c.Consumer.Offsets.AutoCommit.Enable = true
+	c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Retry.Max = 3
 
@@ -650,7 +658,7 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
-	case c.Consumer.Offsets.CommitInterval <= 0:
+	case c.Consumer.Offsets.AutoCommit.Interval <= 0:
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

+ 6 - 1
offset_manager.go

@@ -58,7 +58,7 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client
 		client: client,
 		conf:   conf,
 		group:  group,
-		ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
+		ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
 
 		memberID:   memberID,
@@ -233,7 +233,12 @@ func (om *offsetManager) mainLoop() {
 	}
 }
 
+// flushToBroker is ignored if auto-commit offsets is disabled
 func (om *offsetManager) flushToBroker() {
+	if !om.conf.Consumer.Offsets.AutoCommit.Enable {
+		return
+	}
+
 	req := om.constructRequest()
 	if req == nil {
 		return

+ 80 - 5
offset_manager_test.go

@@ -7,15 +7,14 @@ import (
 )
 
 func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
-	backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
+	backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
 
-	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	if backoffFunc != nil {
 		config.Metadata.Retry.BackoffFunc = backoffFunc
 	}
-	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
+	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
 	config.Version = V0_9_0_0
 	if retention > 0 {
 		config.Consumer.Offsets.Retention = retention
@@ -52,7 +51,7 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
 
 func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
 	testClient Client, broker, coordinator *MockBroker) {
-	return initOffsetManagerWithBackoffFunc(t, retention, nil)
+	return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
 }
 
 func initPartitionOffsetManager(t *testing.T, om OffsetManager,
@@ -97,6 +96,82 @@ func TestNewOffsetManager(t *testing.T) {
 	}
 }
 
+var offsetsautocommitTestTable = []struct {
+	name   string
+	set    bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
+	enable bool
+}{
+	{
+		"AutoCommit (default)",
+		false, // use default
+		true,
+	},
+	{
+		"AutoCommit Enabled",
+		true,
+		true,
+	},
+	{
+		"AutoCommit Disabled",
+		true,
+		false,
+	},
+}
+
+func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
+	// Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
+	for _, tt := range offsetsautocommitTestTable {
+		t.Run(tt.name, func(t *testing.T) {
+
+			config := NewConfig()
+			if tt.set {
+				config.Consumer.Offsets.AutoCommit.Enable = tt.enable
+			}
+			om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
+			pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+			// Wait long enough for the test not to fail..
+			timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
+
+			called := make(chan none)
+
+			ocResponse := new(OffsetCommitResponse)
+			ocResponse.AddError("my_topic", 0, ErrNoError)
+			handler := func(req *request) (res encoder) {
+				close(called)
+				return ocResponse
+			}
+			coordinator.setHandler(handler)
+
+			// Should force an offset commit, if auto-commit is enabled.
+			expected := int64(1)
+			pom.ResetOffset(expected, "modified_meta")
+			_, _ = pom.NextOffset()
+
+			select {
+			case <-called:
+				// OffsetManager called on the wire.
+				if !config.Consumer.Offsets.AutoCommit.Enable {
+					t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
+				}
+			case <-time.After(timeout):
+				// Timeout waiting for OffsetManager to call on the wire.
+				if config.Consumer.Offsets.AutoCommit.Enable {
+					t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
+				}
+			}
+
+			broker.Close()
+			coordinator.Close()
+
+			// !! om must be closed before the pom so pom.release() is called before pom.Close()
+			safeClose(t, om)
+			safeClose(t, pom)
+			safeClose(t, testClient)
+		})
+	}
+}
+
 // Test recovery from ErrNotCoordinatorForConsumer
 // on first fetchInitialOffset call
 func TestOffsetManagerFetchInitialFail(t *testing.T) {
@@ -148,7 +223,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 		atomic.AddInt32(&retryCount, 1)
 		return 0
 	}
-	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
+	om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())
 
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{