Ver código fonte

add customizable partitioner which lets you create a partitioner with most permutations of configurable elements

Michael Ernst 7 anos atrás
pai
commit
30514d0645
1 arquivos alterados com 39 adições e 0 exclusões
  1. 39 0
      partitioner.go

+ 39 - 0
partitioner.go

@@ -42,6 +42,31 @@ type PartitionerConstructor func(topic string) Partitioner
 
 type manualPartitioner struct{}
 
+// HashPartitionOption lets you modify default values of the partitioner
+type HashPartitionerOption func(*hashPartitioner)
+
+// WithAbsFirst means that the partitioner handles absolute values
+// in the same way as the reference Java implementation
+func WithAbsFirst() HashPartitionerOption {
+	return func(hp *hashPartitioner) {
+		hp.referenceAbs = true
+	}
+}
+
+// WithCustomHashFunction lets you specify what hash function to use for the partitioning
+func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
+	return func(hp *hashPartitioner) {
+		hp.hasher = hasher()
+	}
+}
+
+// WithCustomRandomPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
+func WithCustomRandomPartitioner(randomHP *hashPartitioner) HashPartitionerOption {
+	return func(hp *hashPartitioner) {
+		hp.random = hp
+	}
+}
+
 // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
 // ProducerMessage's Partition field as the partition to produce to.
 func NewManualPartitioner(topic string) Partitioner {
@@ -116,6 +141,20 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
 	}
 }
 
+// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
+func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
+	return func(topic string) Partitioner {
+		p := new(hashPartitioner)
+		p.random = NewRandomPartitioner(topic)
+		p.hasher = fnv.New32a()
+		p.referenceAbs = false
+		for _, option := range options {
+			option(p)
+		}
+		return p
+	}
+}
+
 // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
 // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
 // modulus the number of partitions. This ensures that messages with the same key always end up on the