Browse Source

Merge pull request #19 from Shopify/hash_partitioner

Implement a hashing partitioner
Evan Huus 12 years ago
parent
commit
a39b0ce40f
3 changed files with 80 additions and 7 deletions
  1. 43 3
      partitioner.go
  2. 36 3
      partitioner_test.go
  3. 1 1
      producer.go

+ 43 - 3
partitioner.go

@@ -1,6 +1,11 @@
 package sarama
 package sarama
 
 
-import "math/rand"
+import (
+	"hash"
+	"hash/fnv"
+	"math/rand"
+	"time"
+)
 
 
 // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
 // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
 // decides to which partition to send the message. RandomPartitioner and RoundRobinPartitioner are the
 // decides to which partition to send the message. RandomPartitioner and RoundRobinPartitioner are the
@@ -11,10 +16,17 @@ type Partitioner interface {
 
 
 // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
 // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
 type RandomPartitioner struct {
 type RandomPartitioner struct {
+	generator *rand.Rand
 }
 }
 
 
-func (p RandomPartitioner) Partition(key Encoder, numPartitions int) int {
-	return rand.Intn(numPartitions)
+func NewRandomPartitioner() *RandomPartitioner {
+	p := new(RandomPartitioner)
+	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+	return p
+}
+
+func (p *RandomPartitioner) Partition(key Encoder, numPartitions int) int {
+	return p.generator.Intn(numPartitions)
 }
 }
 
 
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
@@ -30,3 +42,31 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
 	p.partition++
 	p.partition++
 	return ret
 	return ret
 }
 }
+
+// HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
+// is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
+// with the same key always end up on the same partition.
+type HashPartitioner struct {
+	random *RandomPartitioner
+	hasher hash.Hash32
+}
+
+func NewHashPartitioner() *HashPartitioner {
+	p := new(HashPartitioner)
+	p.random = NewRandomPartitioner()
+	p.hasher = fnv.New32a()
+	return p
+}
+
+func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
+	if key == nil {
+		return p.random.Partition(key, numPartitions)
+	}
+	bytes, err := key.Encode()
+	if err != nil {
+		return p.random.Partition(key, numPartitions)
+	}
+	p.hasher.Reset()
+	p.hasher.Write(bytes)
+	return int(p.hasher.Sum32()) % numPartitions
+}

+ 36 - 3
partitioner_test.go

@@ -2,8 +2,21 @@ package sarama
 
 
 import "testing"
 import "testing"
 
 
+func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int) {
+	choice := partitioner.Partition(key, numPartitions)
+	if choice < 0 || choice >= numPartitions {
+		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
+	}
+	for i := 1; i < 50; i++ {
+		newChoice := partitioner.Partition(key, numPartitions)
+		if newChoice != choice {
+			t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
+		}
+	}
+}
+
 func TestRandomPartitioner(t *testing.T) {
 func TestRandomPartitioner(t *testing.T) {
-	partitioner := RandomPartitioner{}
+	partitioner := NewRandomPartitioner()
 
 
 	choice := partitioner.Partition(nil, 1)
 	choice := partitioner.Partition(nil, 1)
 	if choice != 0 {
 	if choice != 0 {
@@ -13,7 +26,7 @@ func TestRandomPartitioner(t *testing.T) {
 	for i := 1; i < 50; i++ {
 	for i := 1; i < 50; i++ {
 		choice := partitioner.Partition(nil, 50)
 		choice := partitioner.Partition(nil, 50)
 		if choice < 0 || choice >= 50 {
 		if choice < 0 || choice >= 50 {
-			t.Fatal("Returned partition", choice, "outside of range.")
+			t.Error("Returned partition", choice, "outside of range.")
 		}
 		}
 	}
 	}
 }
 }
@@ -29,7 +42,27 @@ func TestRoundRobinPartitioner(t *testing.T) {
 	for i := 1; i < 50; i++ {
 	for i := 1; i < 50; i++ {
 		choice := partitioner.Partition(nil, 7)
 		choice := partitioner.Partition(nil, 7)
 		if choice != i%7 {
 		if choice != i%7 {
-			t.Fatal("Returned partition", choice, "expecting", i%7)
+			t.Error("Returned partition", choice, "expecting", i%7)
+		}
+	}
+}
+
+func TestHashPartitioner(t *testing.T) {
+	partitioner := NewHashPartitioner()
+
+	choice := partitioner.Partition(nil, 1)
+	if choice != 0 {
+		t.Error("Returned non-zero partition when only one available.")
+	}
+
+	for i := 1; i < 50; i++ {
+		choice := partitioner.Partition(nil, 50)
+		if choice < 0 || choice >= 50 {
+			t.Error("Returned partition", choice, "outside of range for nil key.")
 		}
 		}
 	}
 	}
+
+	assertPartitioningConsistent(t, partitioner, StringEncoder("ABC"), 50)
+	assertPartitioningConsistent(t, partitioner, StringEncoder("ABCDEF"), 37)
+	assertPartitioningConsistent(t, partitioner, StringEncoder("some random thing"), 3)
 }
 }

+ 1 - 1
producer.go

@@ -32,7 +32,7 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 	}
 	}
 
 
 	if config.Partitioner == nil {
 	if config.Partitioner == nil {
-		config.Partitioner = RandomPartitioner{}
+		config.Partitioner = NewRandomPartitioner()
 	}
 	}
 
 
 	p := new(Producer)
 	p := new(Producer)