Browse Source

Merge pull request #522 from Shopify/offset-manager-tweaks

A few minor offset-manager tweaks
Evan Huus 9 years ago
parent
commit
9dd45b2fba
2 changed files with 7 additions and 5 deletions
  1. 2 2
      config.go
  2. 5 3
      offset_manager.go

+ 2 - 2
config.go

@@ -130,7 +130,7 @@ type Config struct {
 		// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
 		// manual use of an OffsetManager but will eventually be automated.
 		Offsets struct {
-			// How frequently to commit updated offsets. Defaults to 10s.
+			// How frequently to commit updated offsets. Defaults to 1s.
 			CommitInterval time.Duration
 
 			// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
@@ -175,7 +175,7 @@ func NewConfig() *Config {
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
 	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
-	c.Consumer.Offsets.CommitInterval = 10 * time.Second
+	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 
 	c.ChannelBufferSize = 256

+ 5 - 3
offset_manager.go

@@ -311,9 +311,11 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
 	pom.lock.Lock()
 	defer pom.lock.Unlock()
 
-	pom.offset = offset
-	pom.metadata = metadata
-	pom.dirty = true
+	if offset > pom.offset {
+		pom.offset = offset
+		pom.metadata = metadata
+		pom.dirty = true
+	}
 }
 
 func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {