浏览代码

Misc partition fixes, including making sure they're in the right order

Evan Huus 12 年之前
父节点
当前提交
daabaa3c99
共有 4 个文件被更改,包括 34 次插入7 次删除
  1. 5 1
      metadata_cache.go
  2. 3 3
      partition_chooser.go
  3. 11 3
      producer.go
  4. 15 0
      utils.go

+ 5 - 1
metadata_cache.go

@@ -1,6 +1,9 @@
 package kafka
 package kafka
 
 
-import "sync"
+import (
+	"sort"
+	"sync"
+)
 
 
 type metadataCache struct {
 type metadataCache struct {
 	client  *Client
 	client  *Client
@@ -74,6 +77,7 @@ func (mc *metadataCache) partitions(topic string) []int32 {
 		ret = append(ret, id)
 		ret = append(ret, id)
 	}
 	}
 
 
+	sort.Sort(int32Slice(ret))
 	return ret
 	return ret
 }
 }
 
 

+ 3 - 3
partition_chooser.go

@@ -3,12 +3,12 @@ package kafka
 import "math/rand"
 import "math/rand"
 
 
 type PartitionChooser interface {
 type PartitionChooser interface {
-	ChoosePartition(key encoder, partitions []int32) int32
+	ChoosePartition(key encoder, partitions int) int
 }
 }
 
 
 type RandomPartitioner struct {
 type RandomPartitioner struct {
 }
 }
 
 
-func (p RandomPartitioner) ChoosePartition(key encoder, partitions []int32) int32 {
-	return partitions[rand.Intn(len(partitions))]
+func (p RandomPartitioner) ChoosePartition(key encoder, partitions int) int {
+	return rand.Intn(partitions)
 }
 }

+ 11 - 3
producer.go

@@ -16,10 +16,10 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
 	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
 }
 }
 
 
-func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
+func (p *Producer) choosePartition(key encoder) (int32, error) {
 	partitions, err := p.Partitions(p.topic)
 	partitions, err := p.Partitions(p.topic)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return -1, err
 	}
 	}
 
 
 	var partitioner PartitionChooser
 	var partitioner PartitionChooser
@@ -28,7 +28,15 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 	} else {
 	} else {
 		partitioner = p.partitioner
 		partitioner = p.partitioner
 	}
 	}
-	partition := partitioner.ChoosePartition(key, partitions)
+
+	return partitions[partitioner.ChoosePartition(key, len(partitions))], nil
+}
+
+func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
+	partition, err := p.choosePartition(key)
+	if err != nil {
+		return nil, err
+	}
 
 
 	msg, err := newMessage(key, value)
 	msg, err := newMessage(key, value)
 	if err != nil {
 	if err != nil {

+ 15 - 0
utils.go

@@ -0,0 +1,15 @@
+package kafka
+
+type int32Slice []int32
+
+func (slice int32Slice) Len() int {
+	return len(slice)
+}
+
+func (slice int32Slice) Less(i, j int) bool {
+	return slice[i] < slice[j]
+}
+
+func (slice int32Slice) Swap(i, j int) {
+	slice[i], slice[j] = slice[j], slice[i]
+}