Browse Source

Fix HashPartitioner's negative result by applying modulo before checking negative

Tayida Tapjinda 9 years ago
parent
commit
0b1355f294
2 changed files with 21 additions and 3 deletions
  1. 4 3
      partitioner.go
  2. 17 0
      partitioner_test.go

+ 4 - 3
partitioner.go

@@ -112,10 +112,11 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
 		return -1, err
 	}
 	hash := int32(p.hasher.Sum32())
-	if hash < 0 {
-		hash = -hash
+	partition := hash % numPartitions
+	if partition < 0 {
+		partition = -partition
 	}
-	return hash % numPartitions, nil
+	return partition, nil
 }
 
 func (p *hashPartitioner) RequiresConsistency() bool {

+ 17 - 0
partitioner_test.go

@@ -100,6 +100,23 @@ func TestHashPartitioner(t *testing.T) {
 	}
 }
 
+func TestHashPartitionerMinInt32(t *testing.T) {
+	partitioner := NewHashPartitioner("mytopic")
+
+	msg := ProducerMessage{}
+	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
+	// which is -2147483648 or int32's min value
+	msg.Key = StringEncoder("1468509572224")
+
+	choice, err := partitioner.Partition(&msg, 50)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
+	if choice < 0 || choice >= 50 {
+		t.Error("Returned partition", choice, "outside of range for nil key.")
+	}
+}
+
 func TestManualPartitioner(t *testing.T) {
 	partitioner := NewManualPartitioner("mytopic")