Browse Source

Merge pull request #356 from Shopify/topic-partitioning

Pass the topic to the partitioner constructor
Evan Huus 10 years ago
parent
commit
fee49d0a00
3 changed files with 11 additions and 11 deletions
  1. 1 1
      async_producer.go
  2. 6 6
      partitioner.go
  3. 4 4
      partitioner_test.go

+ 1 - 1
async_producer.go

@@ -262,7 +262,7 @@ func (p *asyncProducer) topicDispatcher() {
 // partitions messages, then dispatches them by partition
 // partitions messages, then dispatches them by partition
 func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
 	handlers := make(map[int32]chan *ProducerMessage)
-	partitioner := p.conf.Producer.Partitioner()
+	partitioner := p.conf.Producer.Partitioner(topic)
 
 
 	for msg := range input {
 	for msg := range input {
 		if msg.retries == 0 {
 		if msg.retries == 0 {

+ 6 - 6
partitioner.go

@@ -20,13 +20,13 @@ type Partitioner interface {
 }
 }
 
 
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
-type PartitionerConstructor func() Partitioner
+type PartitionerConstructor func(topic string) Partitioner
 
 
 type manualPartitioner struct{}
 type manualPartitioner struct{}
 
 
 // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
 // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
 // ProducerMessage's Partition field as the partition to produce to.
 // ProducerMessage's Partition field as the partition to produce to.
-func NewManualPartitioner() Partitioner {
+func NewManualPartitioner(topic string) Partitioner {
 	return new(manualPartitioner)
 	return new(manualPartitioner)
 }
 }
 
 
@@ -43,7 +43,7 @@ type randomPartitioner struct {
 }
 }
 
 
 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
-func NewRandomPartitioner() Partitioner {
+func NewRandomPartitioner(topic string) Partitioner {
 	p := new(randomPartitioner)
 	p := new(randomPartitioner)
 	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
 	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
 	return p
 	return p
@@ -62,7 +62,7 @@ type roundRobinPartitioner struct {
 }
 }
 
 
 // NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
 // NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
-func NewRoundRobinPartitioner() Partitioner {
+func NewRoundRobinPartitioner(topic string) Partitioner {
 	return &roundRobinPartitioner{}
 	return &roundRobinPartitioner{}
 }
 }
 
 
@@ -88,9 +88,9 @@ type hashPartitioner struct {
 // encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key
 // encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key
 // is used, modulus the number of partitions. This ensures that messages with the same key always end up on the
 // is used, modulus the number of partitions. This ensures that messages with the same key always end up on the
 // same partition.
 // same partition.
-func NewHashPartitioner() Partitioner {
+func NewHashPartitioner(topic string) Partitioner {
 	p := new(hashPartitioner)
 	p := new(hashPartitioner)
-	p.random = NewRandomPartitioner()
+	p.random = NewRandomPartitioner(topic)
 	p.hasher = fnv.New32a()
 	p.hasher = fnv.New32a()
 	return p
 	return p
 }
 }

+ 4 - 4
partitioner_test.go

@@ -25,7 +25,7 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message
 }
 }
 
 
 func TestRandomPartitioner(t *testing.T) {
 func TestRandomPartitioner(t *testing.T) {
-	partitioner := NewRandomPartitioner()
+	partitioner := NewRandomPartitioner("mytopic")
 
 
 	choice, err := partitioner.Partition(nil, 1)
 	choice, err := partitioner.Partition(nil, 1)
 	if err != nil {
 	if err != nil {
@@ -47,7 +47,7 @@ func TestRandomPartitioner(t *testing.T) {
 }
 }
 
 
 func TestRoundRobinPartitioner(t *testing.T) {
 func TestRoundRobinPartitioner(t *testing.T) {
-	partitioner := NewRoundRobinPartitioner()
+	partitioner := NewRoundRobinPartitioner("mytopic")
 
 
 	choice, err := partitioner.Partition(nil, 1)
 	choice, err := partitioner.Partition(nil, 1)
 	if err != nil {
 	if err != nil {
@@ -70,7 +70,7 @@ func TestRoundRobinPartitioner(t *testing.T) {
 }
 }
 
 
 func TestHashPartitioner(t *testing.T) {
 func TestHashPartitioner(t *testing.T) {
-	partitioner := NewHashPartitioner()
+	partitioner := NewHashPartitioner("mytopic")
 
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
 	if err != nil {
@@ -100,7 +100,7 @@ func TestHashPartitioner(t *testing.T) {
 }
 }
 
 
 func TestManualPartitioner(t *testing.T) {
 func TestManualPartitioner(t *testing.T) {
-	partitioner := NewManualPartitioner()
+	partitioner := NewManualPartitioner("mytopic")
 
 
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
 	if err != nil {