|
|
@@ -82,14 +82,16 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+type hasherFunc func() hash.Hash32
|
|
|
+
|
|
|
type hashPartitioner struct {
|
|
|
random Partitioner
|
|
|
- hasher hash.Hash32
|
|
|
+ hasher hasherFunc
|
|
|
}
|
|
|
|
|
|
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
|
|
|
// allowing the use of custom hasher
|
|
|
-func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
|
|
|
+func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
|
|
|
return func(topic string) Partitioner {
|
|
|
p := new(hashPartitioner)
|
|
|
p.random = NewRandomPartitioner(topic)
|
|
|
@@ -105,7 +107,7 @@ func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
|
|
|
func NewHashPartitioner(topic string) Partitioner {
|
|
|
p := new(hashPartitioner)
|
|
|
p.random = NewRandomPartitioner(topic)
|
|
|
- p.hasher = fnv.New32a()
|
|
|
+ p.hasher = fnv.New32a
|
|
|
return p
|
|
|
}
|
|
|
|
|
|
@@ -117,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
}
|
|
|
- p.hasher.Reset()
|
|
|
- _, err = p.hasher.Write(bytes)
|
|
|
+ hash := p.hasher()
|
|
|
+ _, err = hash.Write(bytes)
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
}
|
|
|
- partition := int32(p.hasher.Sum32()) % numPartitions
|
|
|
+ partition := int32(hash.Sum32()) % numPartitions
|
|
|
if partition < 0 {
|
|
|
partition = -partition
|
|
|
}
|