瀏覽代碼

Support configuring target kafka version

This is unused currently, but will have a bunch of uses soon. Based on a
simplified version of https://github.com/Shopify/sarama/pull/655 by Mohammad
Rafay Aleem.

Addresses the major chunk of #617.
Evan Huus 9 年之前
父節點
當前提交
5e8b6ff817
共有 3 個文件被更改,包括 68 次插入2 次删除
  1. 9 2
      config.go
  2. 38 0
      utils.go
  3. 21 0
      utils_test.go

+ 9 - 2
config.go

@@ -226,6 +226,13 @@ type Config struct {
 	// in the background while user code is working, greatly improving throughput.
 	// in the background while user code is working, greatly improving throughput.
 	// Defaults to 256.
 	// Defaults to 256.
 	ChannelBufferSize int
 	ChannelBufferSize int
+	// The version of Kafka that Sarama will assume it is running against.
+	// Defaults to the oldest supported stable version. Since Kafka provides
+	// backwards-compatibility, setting it to a version older than you have
+	// will not break anything, although it may prevent you from using the
+	// latest features. Setting it to a version greater than you are actually
+	// running may lead to random breakage.
+	Version KafkaVersion
 }
 }
 
 
 // NewConfig returns a new configuration instance with sane defaults.
 // NewConfig returns a new configuration instance with sane defaults.
@@ -258,9 +265,9 @@ func NewConfig() *Config {
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.CommitInterval = 1 * time.Second
 	c.Consumer.Offsets.Initial = OffsetNewest
 	c.Consumer.Offsets.Initial = OffsetNewest
 
 
-	c.ChannelBufferSize = 256
-
 	c.ClientID = defaultClientID
 	c.ClientID = defaultClientID
+	c.ChannelBufferSize = 256
+	c.Version = V0_8_2_0
 
 
 	return c
 	return c
 }
 }

+ 38 - 0
utils.go

@@ -109,3 +109,41 @@ func newBufConn(conn net.Conn) *bufConn {
 func (bc *bufConn) Read(b []byte) (n int, err error) {
 func (bc *bufConn) Read(b []byte) (n int, err error) {
 	return bc.buf.Read(b)
 	return bc.buf.Read(b)
 }
 }
+
+// KafkaVersion instances represent versions of the upstream Kafka broker.
+type KafkaVersion struct {
+	// it's a struct rather than just typing the array directly to make it opaque and stop people
+	// generating their own arbitrary versions
+	version [4]uint
+}
+
+func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
+	return KafkaVersion{
+		version: [4]uint{major, minor, veryMinor, patch},
+	}
+}
+
+// IsAtLeast return true if and only if the version it is called on is
+// greater than or equal to the version passed in:
+//    V1.IsAtLeast(V2) // false
+//    V2.IsAtLeast(V1) // true
+func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
+	for i := range v.version {
+		if v.version[i] > other.version[i] {
+			return true
+		} else if v.version[i] < other.version[i] {
+			return false
+		}
+	}
+	return true
+}
+
+// Effective constants defining the supported kafka versions.
+var (
+	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
+	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
+	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
+	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
+	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
+	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
+)

+ 21 - 0
utils_test.go

@@ -0,0 +1,21 @@
+package sarama
+
+import "testing"
+
+func TestVersionCompare(t *testing.T) {
+	if V0_8_2_0.IsAtLeast(V0_8_2_1) {
+		t.Error("0.8.2.0 >= 0.8.2.1")
+	}
+	if !V0_8_2_1.IsAtLeast(V0_8_2_0) {
+		t.Error("! 0.8.2.1 >= 0.8.2.0")
+	}
+	if !V0_8_2_0.IsAtLeast(V0_8_2_0) {
+		t.Error("! 0.8.2.0 >= 0.8.2.0")
+	}
+	if !V0_9_0_0.IsAtLeast(V0_8_2_1) {
+		t.Error("! 0.9.0.0 >= 0.8.2.1")
+	}
+	if V0_8_2_1.IsAtLeast(V0_10_0_0) {
+		t.Error("0.8.2.1 >= 0.10.0.0")
+	}
+}