Browse Source

Merge pull request #333 from Shopify/partition_based_on_entire_message

Partitioners operate on the entire ProducerMessage, not just the key
Willem van Bergen 10 years ago
parent
commit
e5245babad
3 changed files with 20 additions and 19 deletions
  1. 12 11
      partitioner.go
  2. 7 7
      partitioner_test.go
  3. 1 1
      producer.go

+ 12 - 11
partitioner.go

@@ -7,11 +7,11 @@ import (
 	"time"
 )
 
-// Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
+// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
 // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
 // as simple default implementations.
 type Partitioner interface {
-	Partition(key Encoder, numPartitions int32) (int32, error) // Partition takes the key and partition count and chooses a partition
+	Partition(message *ProducerMessage, numPartitions int32) (int32, error) // Partition takes a message and partition count and chooses a partition
 
 	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
 	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
@@ -33,7 +33,7 @@ func NewRandomPartitioner() Partitioner {
 	return p
 }
 
-func (p *randomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
 	return int32(p.generator.Intn(int(numPartitions))), nil
 }
 
@@ -50,7 +50,7 @@ func NewRoundRobinPartitioner() Partitioner {
 	return &roundRobinPartitioner{}
 }
 
-func (p *roundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
 	if p.partition >= numPartitions {
 		p.partition = 0
 	}
@@ -68,9 +68,10 @@ type hashPartitioner struct {
 	hasher hash.Hash32
 }
 
-// NewHashPartitioner returns a Partitioner which behaves as follows. If the key is nil, or fails to encode, then a random partition
-// is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
-// with the same key always end up on the same partition.
+// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil, or fails to
+// encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key
+// is used, modulus the number of partitions. This ensures that messages with the same key always end up on the
+// same partition.
 func NewHashPartitioner() Partitioner {
 	p := new(hashPartitioner)
 	p.random = NewRandomPartitioner()
@@ -78,11 +79,11 @@ func NewHashPartitioner() Partitioner {
 	return p
 }
 
-func (p *hashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
-	if key == nil {
-		return p.random.Partition(key, numPartitions)
+func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
+	if message.Key == nil {
+		return p.random.Partition(message, numPartitions)
 	}
-	bytes, err := key.Encode()
+	bytes, err := message.Key.Encode()
 	if err != nil {
 		return -1, err
 	}

+ 7 - 7
partitioner_test.go

@@ -5,16 +5,16 @@ import (
 	"testing"
 )
 
-func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) {
-	choice, err := partitioner.Partition(key, numPartitions)
+func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) {
+	choice, err := partitioner.Partition(message, numPartitions)
 	if err != nil {
 		t.Error(partitioner, err)
 	}
 	if choice < 0 || choice >= numPartitions {
-		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
+		t.Error(partitioner, "returned partition", choice, "outside of range for", message)
 	}
 	for i := 1; i < 50; i++ {
-		newChoice, err := partitioner.Partition(key, numPartitions)
+		newChoice, err := partitioner.Partition(message, numPartitions)
 		if err != nil {
 			t.Error(partitioner, err)
 		}
@@ -72,7 +72,7 @@ func TestRoundRobinPartitioner(t *testing.T) {
 func TestHashPartitioner(t *testing.T) {
 	partitioner := NewHashPartitioner()
 
-	choice, err := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
 	if err != nil {
 		t.Error(partitioner, err)
 	}
@@ -81,7 +81,7 @@ func TestHashPartitioner(t *testing.T) {
 	}
 
 	for i := 1; i < 50; i++ {
-		choice, err := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(&ProducerMessage{}, 50)
 		if err != nil {
 			t.Error(partitioner, err)
 		}
@@ -95,6 +95,6 @@ func TestHashPartitioner(t *testing.T) {
 		if _, err := rand.Read(buf); err != nil {
 			t.Error(err)
 		}
-		assertPartitioningConsistent(t, partitioner, ByteEncoder(buf), 50)
+		assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
 	}
 }

+ 1 - 1
producer.go

@@ -653,7 +653,7 @@ func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
 		return ErrLeaderNotAvailable
 	}
 
-	choice, err := partitioner.Partition(msg.Key, numPartitions)
+	choice, err := partitioner.Partition(msg, numPartitions)
 
 	if err != nil {
 		return err