|
|
@@ -22,35 +22,35 @@ type Partitioner interface {
|
|
|
// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
|
|
|
type PartitionerConstructor func() Partitioner
|
|
|
|
|
|
-// RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
|
|
|
-type RandomPartitioner struct {
|
|
|
+type randomPartitioner struct {
|
|
|
generator *rand.Rand
|
|
|
}
|
|
|
|
|
|
+// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
|
|
|
func NewRandomPartitioner() Partitioner {
|
|
|
- p := new(RandomPartitioner)
|
|
|
+ p := new(randomPartitioner)
|
|
|
p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
|
|
|
return p
|
|
|
}
|
|
|
|
|
|
-func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
+func (p *randomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
return int32(p.generator.Intn(int(numPartitions))), nil
|
|
|
}
|
|
|
|
|
|
-func (p *RandomPartitioner) RequiresConsistency() bool {
|
|
|
+func (p *randomPartitioner) RequiresConsistency() bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-// RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
|
|
|
-type RoundRobinPartitioner struct {
|
|
|
+type roundRobinPartitioner struct {
|
|
|
partition int32
|
|
|
}
|
|
|
|
|
|
+// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
|
|
|
func NewRoundRobinPartitioner() Partitioner {
|
|
|
- return &RoundRobinPartitioner{}
|
|
|
+ return &roundRobinPartitioner{}
|
|
|
}
|
|
|
|
|
|
-func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
+func (p *roundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
if p.partition >= numPartitions {
|
|
|
p.partition = 0
|
|
|
}
|
|
|
@@ -59,26 +59,26 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int
|
|
|
return ret, nil
|
|
|
}
|
|
|
|
|
|
-func (p *RoundRobinPartitioner) RequiresConsistency() bool {
|
|
|
+func (p *roundRobinPartitioner) RequiresConsistency() bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-// HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
|
|
|
-// is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
|
|
|
-// with the same key always end up on the same partition.
|
|
|
-type HashPartitioner struct {
|
|
|
+type hashPartitioner struct {
|
|
|
random Partitioner
|
|
|
hasher hash.Hash32
|
|
|
}
|
|
|
|
|
|
+// NewHashPartitioner returns a Partitioner which behaves as follows. If the key is nil, or fails to encode, then a random partition
|
|
|
+// is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
|
|
|
+// with the same key always end up on the same partition.
|
|
|
func NewHashPartitioner() Partitioner {
|
|
|
- p := new(HashPartitioner)
|
|
|
+ p := new(hashPartitioner)
|
|
|
p.random = NewRandomPartitioner()
|
|
|
p.hasher = fnv.New32a()
|
|
|
return p
|
|
|
}
|
|
|
|
|
|
-func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
+func (p *hashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
if key == nil {
|
|
|
return p.random.Partition(key, numPartitions)
|
|
|
}
|
|
|
@@ -98,19 +98,6 @@ func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, er
|
|
|
return hash % numPartitions, nil
|
|
|
}
|
|
|
|
|
|
-func (p *HashPartitioner) RequiresConsistency() bool {
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
-// ConstantPartitioner implements the Partitioner interface by just returning a constant value.
|
|
|
-type ConstantPartitioner struct {
|
|
|
- Constant int32
|
|
|
-}
|
|
|
-
|
|
|
-func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
|
|
|
- return p.Constant, nil
|
|
|
-}
|
|
|
-
|
|
|
-func (p *ConstantPartitioner) RequiresConsistency() bool {
|
|
|
+func (p *hashPartitioner) RequiresConsistency() bool {
|
|
|
return true
|
|
|
}
|