浏览代码

serialize HashPartitioner and RoundRobinPartitioner

Burke Libbey 12 年之前
父节点
当前提交
2e9d4aa756
共有 1 个文件被更改,包括 6 次插入0 次删除
  1. 6 0
      partitioner.go

+ 6 - 0
partitioner.go

@@ -36,9 +36,12 @@ func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
 type RoundRobinPartitioner struct {
 	partition int32
+	m         sync.Mutex
 }
 
 func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	p.m.Lock()
+	defer p.m.Unlock()
 	if p.partition >= numPartitions {
 		p.partition = 0
 	}
@@ -53,6 +56,7 @@ func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int3
 type HashPartitioner struct {
 	random *RandomPartitioner
 	hasher hash.Hash32
+	m      sync.Mutex
 }
 
 func NewHashPartitioner() *HashPartitioner {
@@ -63,6 +67,8 @@ func NewHashPartitioner() *HashPartitioner {
 }
 
 func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+	p.m.Lock()
+	defer p.m.Unlock()
 	if key == nil {
 		return p.random.Partition(key, numPartitions)
 	}