Просмотр исходного кода

Pass the topic to the partitioner constructor

This permits custom constructors that return different types of partitioners for
different topics.
Evan Huus 11 лет назад
Родитель
Сommit
1101246f70
3 измененных файлов с 11 добавлено и 11 удалено
  1. 1 1
      async_producer.go
  2. 6 6
      partitioner.go
  3. 4 4
      partitioner_test.go

+ 1 - 1
async_producer.go

@@ -262,7 +262,7 @@ func (p *asyncProducer) topicDispatcher() {
 // partitions messages, then dispatches them by partition
 func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
-	partitioner := p.conf.Producer.Partitioner()
+	partitioner := p.conf.Producer.Partitioner(topic)
 
 	for msg := range input {
 		if msg.retries == 0 {

+ 6 - 6
partitioner.go

@@ -20,13 +20,13 @@ type Partitioner interface {
 }
 
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
-type PartitionerConstructor func() Partitioner
+type PartitionerConstructor func(topic string) Partitioner
 
 type manualPartitioner struct{}
 
 // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
 // ProducerMessage's Partition field as the partition to produce to.
-func NewManualPartitioner() Partitioner {
+func NewManualPartitioner(topic string) Partitioner {
 	return new(manualPartitioner)
 }
 
@@ -43,7 +43,7 @@ type randomPartitioner struct {
 }
 
 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
-func NewRandomPartitioner() Partitioner {
+func NewRandomPartitioner(topic string) Partitioner {
 	p := new(randomPartitioner)
 	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
 	return p
@@ -62,7 +62,7 @@ type roundRobinPartitioner struct {
 }
 
 // NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
-func NewRoundRobinPartitioner() Partitioner {
+func NewRoundRobinPartitioner(topic string) Partitioner {
 	return &roundRobinPartitioner{}
 }
 
@@ -88,9 +88,9 @@ type hashPartitioner struct {
 // encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key
 // is used, modulus the number of partitions. This ensures that messages with the same key always end up on the
 // same partition.
-func NewHashPartitioner() Partitioner {
+func NewHashPartitioner(topic string) Partitioner {
 	p := new(hashPartitioner)
-	p.random = NewRandomPartitioner()
+	p.random = NewRandomPartitioner(topic)
 	p.hasher = fnv.New32a()
 	return p
 }

+ 4 - 4
partitioner_test.go

@@ -25,7 +25,7 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message
 }
 
 func TestRandomPartitioner(t *testing.T) {
-	partitioner := NewRandomPartitioner()
+	partitioner := NewRandomPartitioner("mytopic")
 
 	choice, err := partitioner.Partition(nil, 1)
 	if err != nil {
@@ -47,7 +47,7 @@ func TestRandomPartitioner(t *testing.T) {
 }
 
 func TestRoundRobinPartitioner(t *testing.T) {
-	partitioner := NewRoundRobinPartitioner()
+	partitioner := NewRoundRobinPartitioner("mytopic")
 
 	choice, err := partitioner.Partition(nil, 1)
 	if err != nil {
@@ -70,7 +70,7 @@ func TestRoundRobinPartitioner(t *testing.T) {
 }
 
 func TestHashPartitioner(t *testing.T) {
-	partitioner := NewHashPartitioner()
+	partitioner := NewHashPartitioner("mytopic")
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
@@ -100,7 +100,7 @@ func TestHashPartitioner(t *testing.T) {
 }
 
 func TestManualPartitioner(t *testing.T) {
-	partitioner := NewManualPartitioner()
+	partitioner := NewManualPartitioner("mytopic")
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {