Browse Source

add reference-compatible hash partitioner

Evan Huus 6 years ago
parent
commit
6967cdb516
1 changed files with 29 additions and 5 deletions
  1. 29 5
      partitioner.go

+ 29 - 5
partitioner.go

@@ -98,8 +98,9 @@ func (p *roundRobinPartitioner) RequiresConsistency() bool {
 }
 
 type hashPartitioner struct {
-	random Partitioner
-	hasher hash.Hash32
+	random       Partitioner
+	hasher       hash.Hash32
+	referenceAbs bool
 }
 
 // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
@@ -110,6 +111,7 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
 		p := new(hashPartitioner)
 		p.random = NewRandomPartitioner(topic)
 		p.hasher = hasher()
+		p.referenceAbs = false
 		return p
 	}
 }
@@ -122,6 +124,19 @@ func NewHashPartitioner(topic string) Partitioner {
 	p := new(hashPartitioner)
 	p.random = NewRandomPartitioner(topic)
 	p.hasher = fnv.New32a()
+	p.referenceAbs = false
+	return p
+}
+
+// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
+// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
+// that but it had a mistake and now there are people depending on both behaviours. This will
+// all go away on the next major version bump.
+func NewReferenceHashPartitioner(topic string) Partitioner {
+	p := new(hashPartitioner)
+	p.random = NewRandomPartitioner(topic)
+	p.hasher = fnv.New32a()
+	p.referenceAbs = true
 	return p
 }
 
@@ -138,9 +153,18 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
 	if err != nil {
 		return -1, err
 	}
-	partition := int32(p.hasher.Sum32()) % numPartitions
-	if partition < 0 {
-		partition = -partition
+	var partition int32
+	// Turns out we were doing our absolute value in a subtly different way from the upstream
+	// implementation, but now we need to maintain backwards compat for people who started using
+	// the old version; if referenceAbs is set we are compatible with the reference java client
+	// but not past Sarama versions
+	if p.referenceAbs {
+		partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
+	} else {
+		partition = int32(p.hasher.Sum32()) % numPartitions
+		if partition < 0 {
+			partition = -partition
+		}
 	}
 	return partition, nil
 }