|
|
@@ -82,16 +82,15 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-type hasherFunc func() hash.Hash32
|
|
|
-
|
|
|
type hashPartitioner struct {
|
|
|
random Partitioner
|
|
|
hasher hash.Hash32
|
|
|
}
|
|
|
|
|
|
-// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
|
|
|
-// allowing the use of custom hasher
|
|
|
-func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
|
|
|
+// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
|
|
|
+// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
|
|
|
+// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
|
|
|
+func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
|
|
|
return func(topic string) Partitioner {
|
|
|
p := new(hashPartitioner)
|
|
|
p.random = NewRandomPartitioner(topic)
|