|
|
@@ -4,7 +4,6 @@ import (
|
|
|
"hash"
|
|
|
"hash/fnv"
|
|
|
"math/rand"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -18,7 +17,6 @@ type Partitioner interface {
|
|
|
// RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
|
|
|
type RandomPartitioner struct {
|
|
|
generator *rand.Rand
|
|
|
- m sync.Mutex
|
|
|
}
|
|
|
|
|
|
func NewRandomPartitioner() Partitioner {
|
|
|
@@ -28,15 +26,12 @@ func NewRandomPartitioner() Partitioner {
|
|
|
}
|
|
|
|
|
|
func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
|
|
|
- p.m.Lock()
|
|
|
- defer p.m.Unlock()
|
|
|
return int32(p.generator.Intn(int(numPartitions)))
|
|
|
}
|
|
|
|
|
|
// RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
|
|
|
type RoundRobinPartitioner struct {
|
|
|
partition int32
|
|
|
- m sync.Mutex
|
|
|
}
|
|
|
|
|
|
func NewRoundRobinPartitioner() Partitioner {
|
|
|
@@ -44,8 +39,6 @@ func NewRoundRobinPartitioner() Partitioner {
|
|
|
}
|
|
|
|
|
|
func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
|
|
|
- p.m.Lock()
|
|
|
- defer p.m.Unlock()
|
|
|
if p.partition >= numPartitions {
|
|
|
p.partition = 0
|
|
|
}
|
|
|
@@ -60,7 +53,6 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int3
|
|
|
type HashPartitioner struct {
|
|
|
random Partitioner
|
|
|
hasher hash.Hash32
|
|
|
- m sync.Mutex
|
|
|
}
|
|
|
|
|
|
func NewHashPartitioner() Partitioner {
|
|
|
@@ -71,8 +63,6 @@ func NewHashPartitioner() Partitioner {
|
|
|
}
|
|
|
|
|
|
func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
|
|
|
- p.m.Lock()
|
|
|
- defer p.m.Unlock()
|
|
|
if key == nil {
|
|
|
return p.random.Partition(key, numPartitions)
|
|
|
}
|