Browse Source

Merge pull request #841 from manolovl/hasher-callback

Fix CustomHasher
Evan Huus 8 years ago
parent
commit
8093021a9b
2 changed files with 7 additions and 6 deletions
  1. 5 4
      partitioner.go
  2. 2 2
      partitioner_test.go

+ 5 - 4
partitioner.go

@@ -87,13 +87,14 @@ type hashPartitioner struct {
 	hasher hash.Hash32
 }
 
-// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
-// allowing the use of custom hasher
-func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
+// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
+// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
+// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
+func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
 	return func(topic string) Partitioner {
 		p := new(hashPartitioner)
 		p.random = NewRandomPartitioner(topic)
-		p.hasher = hasher
+		p.hasher = hasher()
 		return p
 	}
 }

+ 2 - 2
partitioner_test.go

@@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) {
 
 func TestNewHashPartitionerWithHasher(t *testing.T) {
 	// use the current default hasher fnv.New32a()
-	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
@@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) {
 
 func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
 	// use the current default hasher fnv.New32a()
-	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
 
 	msg := ProducerMessage{}
 	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function