Kaynağa Gözat

Add custom offset commit retry config

Dimitrij Denissenko 7 yıl önce
ebeveyn
işleme
9f431f4dd8
2 değiştirilmiş dosya ile 10 ekleme ve 5 silme
  1. 9 1
      config.go
  2. 1 4
      offset_manager.go

+ 9 - 1
config.go

@@ -255,6 +255,12 @@ type Config struct {
 			// broker version 0.9.0 or later.
 			// (default is 0: disabled).
 			Retention time.Duration
+
+			Retry struct {
+				// The total number of times to retry failing commit
+				// requests during OffsetManager shutdown (default 3).
+				Max int
+			}
 		}
 	}
 
@@ -314,6 +320,7 @@ func NewConfig() *Config {
 	c.Consumer.Return.Errors = false
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
+	c.Consumer.Offsets.Retry.Max = 3
 
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
@@ -450,7 +457,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
 	case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
-
+	case c.Consumer.Offsets.Retry.Max < 0:
+		return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
 	}
 
 	// validate misc shared values

+ 1 - 4
offset_manager.go

@@ -95,14 +95,11 @@ func (om *offsetManager) Close() error {
 		om.asyncClosePOMs()
 
 		// flush one last time
-		for retries := om.conf.Metadata.Retry.Max; true; {
+		for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
 			om.flushToBroker()
 			if om.releasePOMs(false) == 0 {
 				break
 			}
-			if retries--; retries < 0 {
-				break
-			}
 		}
 
 		om.releasePOMs(true)