Parcourir la source

Guard against using LZ4 with old kafka protocol versions

Rene Treffer il y a 9 ans
Parent
commit
645b0298c1
2 fichiers modifiés avec 18 ajouts et 0 suppressions
  1. 6 0
      config.go
  2. 12 0
      config_test.go

+ 6 - 0
config.go

@@ -375,6 +375,12 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Producer.Retry.Backoff must be >= 0")
 	}
 
+	if c.Producer.Compression == CompressionLZ4 && (c.Version == V0_8_2_0 || c.Version == V0_8_2_1 ||
+		c.Version == V0_8_2_2 || c.Version == V0_9_0_0 ||
+		c.Version == V0_9_0_1) {
+		return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
+	}
+
 	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:

+ 12 - 0
config_test.go

@@ -33,6 +33,18 @@ func TestEmptyClientIDConfigValidates(t *testing.T) {
 	}
 }
 
+func TestLZ4ConfigValidation(t *testing.T) {
+	config := NewConfig()
+	config.Producer.Compression = CompressionLZ4
+	if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
+		t.Error("Expected invalid lz4/kakfa version error, got ", err)
+	}
+	config.Version = V0_10_0_0
+	if err := config.Validate(); err != nil {
+		t.Error("Expected lz4 to work, got ", err)
+	}
+}
+
 // This example shows how to integrate with an existing registry as well as publishing metrics
 // on the standard output
 func ExampleConfig_metrics() {