Explorar el Código

Fix concurrency issue in hashedPartitioner

Passing an already instantiated hasher is a really bad idea. Instead pass a function returning the expected interface,
so the hasher is instantiated when needed, also assuring there's one hasher for each partitionDispatcher and thus avoid concurrency problems."
Lyuben Manolov hace 8 años
padre
commit
f3b2363114
Se han modificado 2 ficheros con 8 adiciones y 8 borrados
  1. 6 6
      partitioner.go
  2. 2 2
      partitioner_test.go

+ 6 - 6
partitioner.go

@@ -86,7 +86,7 @@ type hasherFunc func() hash.Hash32
 
 type hashPartitioner struct {
 	random Partitioner
-	hasher hasherFunc
+	hasher hash.Hash32
 }
 
 // NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
@@ -95,7 +95,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
 	return func(topic string) Partitioner {
 		p := new(hashPartitioner)
 		p.random = NewRandomPartitioner(topic)
-		p.hasher = hasher
+		p.hasher = hasher()
 		return p
 	}
 }
@@ -107,7 +107,7 @@ func NewCustomHashPartitioner(hasher hasherFunc) PartitionerConstructor {
 func NewHashPartitioner(topic string) Partitioner {
 	p := new(hashPartitioner)
 	p.random = NewRandomPartitioner(topic)
-	p.hasher = fnv.New32a
+	p.hasher = fnv.New32a()
 	return p
 }
 
@@ -119,12 +119,12 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
 	if err != nil {
 		return -1, err
 	}
-	hash := p.hasher()
-	_, err = hash.Write(bytes)
+	p.hasher.Reset()
+	_, err = p.hasher.Write(bytes)
 	if err != nil {
 		return -1, err
 	}
-	partition := int32(hash.Sum32()) % numPartitions
+	partition := int32(p.hasher.Sum32()) % numPartitions
 	if partition < 0 {
 		partition = -partition
 	}

+ 2 - 2
partitioner_test.go

@@ -73,7 +73,7 @@ func TestRoundRobinPartitioner(t *testing.T) {
 
 func TestNewHashPartitionerWithHasher(t *testing.T) {
 	// use the current default hasher fnv.New32a()
-	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
@@ -104,7 +104,7 @@ func TestNewHashPartitionerWithHasher(t *testing.T) {
 
 func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
 	// use the current default hasher fnv.New32a()
-	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
 
 	msg := ProducerMessage{}
 	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function