Jelajahi Sumber

Implement a hashing partitioner

Evan Huus 12 tahun lalu
induk
melakukan
14dc1b2c4c
2 mengubah file dengan 56 tambahan dan 3 penghapusan
  1. 21 1
      partitioner.go
  2. 35 2
      partitioner_test.go

+ 21 - 1
partitioner.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "math/rand"
+import (
+	"hash/crc32"
+	"math/rand"
+)
 
 // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
 // decides to which partition to send the message. RandomPartitioner and RoundRobinPartitioner are the
@@ -30,3 +33,20 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
 	p.partition++
 	return ret
 }
+
+// HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
+// is chosen. Otherwise the CRC32 of the encoded bytes is used modulus the number of partitions. This ensures that messages
+// with the same key always end up on the same partition.
+type HashPartitioner struct {
+}
+
+func (p HashPartitioner) Partition(key Encoder, numPartitions int) int {
+	if key == nil {
+		return rand.Intn(numPartitions)
+	}
+	bytes, err := key.Encode()
+	if err != nil {
+		return rand.Intn(numPartitions)
+	}
+	return int(crc32.ChecksumIEEE(bytes)) % numPartitions
+}

+ 35 - 2
partitioner_test.go

@@ -2,6 +2,19 @@ package sarama
 
 import "testing"
 
+func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int) {
+	choice := partitioner.Partition(key, numPartitions)
+	if choice < 0 || choice >= numPartitions {
+		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
+	}
+	for i := 1; i < 50; i++ {
+		newChoice := partitioner.Partition(key, numPartitions)
+		if newChoice != choice {
+			t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
+		}
+	}
+}
+
 func TestRandomPartitioner(t *testing.T) {
 	partitioner := RandomPartitioner{}
 
@@ -13,7 +26,7 @@ func TestRandomPartitioner(t *testing.T) {
 	for i := 1; i < 50; i++ {
 		choice := partitioner.Partition(nil, 50)
 		if choice < 0 || choice >= 50 {
-			t.Fatal("Returned partition", choice, "outside of range.")
+			t.Error("Returned partition", choice, "outside of range.")
 		}
 	}
 }
@@ -29,7 +42,27 @@ func TestRoundRobinPartitioner(t *testing.T) {
 	for i := 1; i < 50; i++ {
 		choice := partitioner.Partition(nil, 7)
 		if choice != i%7 {
-			t.Fatal("Returned partition", choice, "expecting", i%7)
+			t.Error("Returned partition", choice, "expecting", i%7)
+		}
+	}
+}
+
+func TestHashPartitioner(t *testing.T) {
+	partitioner := HashPartitioner{}
+
+	choice := partitioner.Partition(nil, 1)
+	if choice != 0 {
+		t.Error("Returned non-zero partition when only one available.")
+	}
+
+	for i := 1; i < 50; i++ {
+		choice := partitioner.Partition(nil, 50)
+		if choice < 0 || choice >= 50 {
+			t.Error("Returned partition", choice, "outside of range for nil key.")
 		}
 	}
+
+	assertPartitioningConsistent(t, partitioner, StringEncoder("ABC"), 50)
+	assertPartitioningConsistent(t, partitioner, StringEncoder("ABCDEF"), 37)
+	assertPartitioningConsistent(t, partitioner, StringEncoder("some random thing"), 3)
 }