浏览代码

Add missing Kafka versions

Maxim Vladimirskiy 7 年之前
父节点
当前提交
e42dc76c4e

+ 1 - 1
config.go

@@ -310,7 +310,7 @@ func NewConfig() *Config {
 
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
-	c.Version = minVersion
+	c.Version = MinVersion
 	c.MetricRegistry = metrics.NewRegistry()
 
 	return c

+ 1 - 1
fetch_request.go

@@ -149,7 +149,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 	case 4:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
fetch_response.go

@@ -280,7 +280,7 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
 	case 4:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 9 - 0
message.go

@@ -24,6 +24,15 @@ const (
 	CompressionLZ4    CompressionCodec = 3
 )
 
+func (cc CompressionCodec) String() string {
+	return []string{
+		"none",
+		"gzip",
+		"snappy",
+		"lz4",
+	}[int(cc)]
+}
+
 // CompressionLevelDefault is the constant to use in CompressionLevel
 // to have the default compression level for any codec. The value is picked
 // that we don't use any existing compression levels.

+ 1 - 1
metadata_request.go

@@ -48,5 +48,5 @@ func (r *MetadataRequest) version() int16 {
 }
 
 func (r *MetadataRequest) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }

+ 1 - 1
metadata_response.go

@@ -185,7 +185,7 @@ func (r *MetadataResponse) version() int16 {
 }
 
 func (r *MetadataResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }
 
 // testing API

+ 1 - 1
offset_commit_request.go

@@ -173,7 +173,7 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
 	case 2:
 		return V0_9_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_commit_response.go

@@ -81,5 +81,5 @@ func (r *OffsetCommitResponse) version() int16 {
 }
 
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }

+ 1 - 1
offset_fetch_request.go

@@ -68,7 +68,7 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_8_2_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_fetch_response.go

@@ -115,7 +115,7 @@ func (r *OffsetFetchResponse) version() int16 {
 }
 
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	return MinVersion
 }
 
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {

+ 1 - 1
offset_request.go

@@ -109,7 +109,7 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_10_1_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
offset_response.go

@@ -155,7 +155,7 @@ func (r *OffsetResponse) requiredVersion() KafkaVersion {
 	case 1:
 		return V0_10_1_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
produce_request.go

@@ -215,7 +215,7 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
 	case 3:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 1 - 1
produce_response.go

@@ -152,7 +152,7 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
 	case 3:
 		return V0_11_0_0
 	default:
-		return minVersion
+		return MinVersion
 	}
 }
 

+ 39 - 13
utils.go

@@ -139,21 +139,47 @@ func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
 
 // Effective constants defining the supported kafka versions.
 var (
-	V0_8_2_0   = newKafkaVersion(0, 8, 2, 0)
-	V0_8_2_1   = newKafkaVersion(0, 8, 2, 1)
-	V0_8_2_2   = newKafkaVersion(0, 8, 2, 2)
-	V0_9_0_0   = newKafkaVersion(0, 9, 0, 0)
-	V0_9_0_1   = newKafkaVersion(0, 9, 0, 1)
-	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
-	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
-	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
-	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
-	V0_11_0_0  = newKafkaVersion(0, 11, 0, 0)
-	V1_0_0_0   = newKafkaVersion(1, 0, 0, 0)
-	minVersion = V0_8_2_0
+	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
+	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
+	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
+	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
+	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
+	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
+	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
+	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
+	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
+	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
+	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
+	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
+	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
+	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
+	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
+
+	SupportedVersions = []KafkaVersion{
+		V0_8_2_0,
+		V0_8_2_1,
+		V0_8_2_2,
+		V0_9_0_0,
+		V0_9_0_1,
+		V0_10_0_0,
+		V0_10_0_1,
+		V0_10_1_0,
+		V0_10_1_1,
+		V0_10_2_0,
+		V0_10_2_1,
+		V0_11_0_0,
+		V0_11_0_1,
+		V0_11_0_2,
+		V1_0_0_0,
+	}
+	MinVersion = V0_8_2_0
+	MaxVersion = V1_0_0_0
 )
 
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
+	if len(s) < 5 {
+		return MinVersion, fmt.Errorf("invalid version `%s`", s)
+	}
 	var major, minor, veryMinor, patch uint
 	var err error
 	if s[0] == '0' {
@@ -162,7 +188,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) {
 		err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
 	}
 	if err != nil {
-		return minVersion, err
+		return MinVersion, err
 	}
 	return newKafkaVersion(major, minor, veryMinor, patch), nil
 }