Преглед изворни кода

Fix HashPartitioner returning negative partitions.

Evan Huus пре 12 година
родитељ
комит
f8f6c2a832
3 измењених фајлова са 27 додато и 15 уклоњено
  1. 11 7
      partitioner.go
  2. 12 6
      partitioner_test.go
  3. 4 2
      producer.go

+ 11 - 7
partitioner.go

@@ -11,7 +11,7 @@ import (
 // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
 // as simple default implementations.
 type Partitioner interface {
-	Partition(key Encoder, numPartitions int) int
+	Partition(key Encoder, numPartitions int32) int32
 }
 
 // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
@@ -25,16 +25,16 @@ func NewRandomPartitioner() *RandomPartitioner {
 	return p
 }
 
-func (p *RandomPartitioner) Partition(key Encoder, numPartitions int) int {
-	return p.generator.Intn(numPartitions)
+func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	return int32(p.generator.Intn(int(numPartitions)))
 }
 
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
 type RoundRobinPartitioner struct {
-	partition int
+	partition int32
 }
 
-func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
+func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 	if p.partition >= numPartitions {
 		p.partition = 0
 	}
@@ -58,7 +58,7 @@ func NewHashPartitioner() *HashPartitioner {
 	return p
 }
 
-func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
+func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 	if key == nil {
 		return p.random.Partition(key, numPartitions)
 	}
@@ -68,5 +68,9 @@ func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
 	}
 	p.hasher.Reset()
 	p.hasher.Write(bytes)
-	return int(p.hasher.Sum32()) % numPartitions
+	hash := int32(p.hasher.Sum32())
+	if hash < 0 {
+		hash = -hash
+	}
+	return hash % numPartitions
 }

+ 12 - 6
partitioner_test.go

@@ -1,8 +1,11 @@
 package sarama
 
-import "testing"
+import (
+	"crypto/rand"
+	"testing"
+)
 
-func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int) {
+func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) {
 	choice := partitioner.Partition(key, numPartitions)
 	if choice < 0 || choice >= numPartitions {
 		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
@@ -39,7 +42,8 @@ func TestRoundRobinPartitioner(t *testing.T) {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
-	for i := 1; i < 50; i++ {
+	var i int32
+	for i = 1; i < 50; i++ {
 		choice := partitioner.Partition(nil, 7)
 		if choice != i%7 {
 			t.Error("Returned partition", choice, "expecting", i%7)
@@ -62,7 +66,9 @@ func TestHashPartitioner(t *testing.T) {
 		}
 	}
 
-	assertPartitioningConsistent(t, partitioner, StringEncoder("ABC"), 50)
-	assertPartitioningConsistent(t, partitioner, StringEncoder("ABCDEF"), 37)
-	assertPartitioningConsistent(t, partitioner, StringEncoder("some random thing"), 3)
+	buf := make([]byte, 256)
+	for i := 1; i < 50; i++ {
+		rand.Read(buf)
+		assertPartitioningConsistent(t, partitioner, ByteEncoder(buf), 50)
+	}
 }

+ 4 - 2
producer.go

@@ -64,9 +64,11 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 		return -1, err
 	}
 
-	choice := p.config.Partitioner.Partition(key, len(partitions))
+	numPartitions := int32(len(partitions))
 
-	if choice >= len(partitions) {
+	choice := p.config.Partitioner.Partition(key, numPartitions)
+
+	if choice < 0 || choice >= numPartitions {
 		return -1, InvalidPartition
 	}