|
|
@@ -1,8 +1,10 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
- "hash/crc32"
|
|
|
+ "hash"
|
|
|
+ "hash/fnv"
|
|
|
"math/rand"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
// Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
|
|
|
@@ -14,10 +16,17 @@ type Partitioner interface {
|
|
|
|
|
|
// RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
|
|
|
type RandomPartitioner struct {
|
|
|
+ generator *rand.Rand
|
|
|
}
|
|
|
|
|
|
-func (p RandomPartitioner) Partition(key Encoder, numPartitions int) int {
|
|
|
- return rand.Intn(numPartitions)
|
|
|
+func NewRandomPartitioner() *RandomPartitioner {
|
|
|
+ p := new(RandomPartitioner)
|
|
|
+ p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
|
|
|
+ return p
|
|
|
+}
|
|
|
+
|
|
|
+func (p *RandomPartitioner) Partition(key Encoder, numPartitions int) int {
|
|
|
+ return p.generator.Intn(numPartitions)
|
|
|
}
|
|
|
|
|
|
// RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
|
|
|
@@ -35,18 +44,29 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
|
|
|
}
|
|
|
|
|
|
// HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
|
|
|
-// is chosen. Otherwise the CRC32 of the encoded bytes is used modulus the number of partitions. This ensures that messages
|
|
|
+// is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
|
|
|
// with the same key always end up on the same partition.
|
|
|
type HashPartitioner struct {
|
|
|
+ random *RandomPartitioner
|
|
|
+ hasher hash.Hash32
|
|
|
+}
|
|
|
+
|
|
|
+func NewHashPartitioner() *HashPartitioner {
|
|
|
+ p := new(HashPartitioner)
|
|
|
+ p.random = NewRandomPartitioner()
|
|
|
+ p.hasher = fnv.New32a()
|
|
|
+ return p
|
|
|
}
|
|
|
|
|
|
-func (p HashPartitioner) Partition(key Encoder, numPartitions int) int {
|
|
|
+func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
|
|
|
if key == nil {
|
|
|
- return rand.Intn(numPartitions)
|
|
|
+ return p.random.Partition(key, numPartitions)
|
|
|
}
|
|
|
bytes, err := key.Encode()
|
|
|
if err != nil {
|
|
|
- return rand.Intn(numPartitions)
|
|
|
+ return p.random.Partition(key, numPartitions)
|
|
|
}
|
|
|
- return int(crc32.ChecksumIEEE(bytes)) % numPartitions
|
|
|
+ p.hasher.Reset()
|
|
|
+ p.hasher.Write(bytes)
|
|
|
+ return int(p.hasher.Sum32()) % numPartitions
|
|
|
}
|