瀏覽代碼

Rework Partition interface and usage

- permit Partition() to return an error, since the HashPartitioner can
  theoretically hit errors calculating the hash or materializing the key
- have each partitioner indicate whether it requires consistency or not

A partitioner that requires consistency (primarily the HashPartitioner) is one
for whom "partition not available" means "fail the message" as opposed to "try
some other partition" (which is better behaviour for the RandomPartitioner, for
example). Respect this value in the producer by using Partitions() or
WritablePartitions() depending.
Evan Huus 11 年之前
父節點
當前提交
3020cf16c6
共有 3 個文件被更改,包括 83 次插入24 次删除
  1. 32 11
      partitioner.go
  2. 38 10
      partitioner_test.go
  3. 13 3
      producer.go

+ 32 - 11
partitioner.go

@@ -11,7 +11,12 @@ import (
 // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
 // as simple default implementations.
 type Partitioner interface {
-	Partition(key Encoder, numPartitions int32) int32
+	Partition(key Encoder, numPartitions int32) (int32, error) // Partition takes the key and partition count and chooses a partition
+
+	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
+	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
+	// be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner.
+	RequiresConsistency() bool
 }
 
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
@@ -28,8 +33,12 @@ func NewRandomPartitioner() Partitioner {
 	return p
 }
 
-func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
-	return int32(p.generator.Intn(int(numPartitions)))
+func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+	return int32(p.generator.Intn(int(numPartitions))), nil
+}
+
+func (p *RandomPartitioner) RequiresConsistency() bool {
+	return false
 }
 
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
@@ -41,13 +50,17 @@ func NewRoundRobinPartitioner() Partitioner {
 	return &RoundRobinPartitioner{}
 }
 
-func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
 	if p.partition >= numPartitions {
 		p.partition = 0
 	}
 	ret := p.partition
 	p.partition++
-	return ret
+	return ret, nil
+}
+
+func (p *RoundRobinPartitioner) RequiresConsistency() bool {
+	return false
 }
 
 // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
@@ -65,24 +78,28 @@ func NewHashPartitioner() Partitioner {
 	return p
 }
 
-func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
 	if key == nil {
 		return p.random.Partition(key, numPartitions)
 	}
 	bytes, err := key.Encode()
 	if err != nil {
-		return p.random.Partition(key, numPartitions)
+		return -1, err
 	}
 	p.hasher.Reset()
 	_, err = p.hasher.Write(bytes)
 	if err != nil {
-		return p.random.Partition(key, numPartitions)
+		return -1, err
 	}
 	hash := int32(p.hasher.Sum32())
 	if hash < 0 {
 		hash = -hash
 	}
-	return hash % numPartitions
+	return hash % numPartitions, nil
+}
+
+func (p *HashPartitioner) RequiresConsistency() bool {
+	return true
 }
 
 // ConstantPartitioner implements the Partitioner interface by just returning a constant value.
@@ -90,6 +107,10 @@ type ConstantPartitioner struct {
 	Constant int32
 }
 
-func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) int32 {
-	return p.Constant
+func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+	return p.Constant, nil
+}
+
+func (p *ConstantPartitioner) RequiresConsistency() bool {
+	return true
 }

+ 38 - 10
partitioner_test.go

@@ -6,12 +6,18 @@ import (
 )
 
 func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) {
-	choice := partitioner.Partition(key, numPartitions)
+	choice, err := partitioner.Partition(key, numPartitions)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice < 0 || choice >= numPartitions {
 		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
 	}
 	for i := 1; i < 50; i++ {
-		newChoice := partitioner.Partition(key, numPartitions)
+		newChoice, err := partitioner.Partition(key, numPartitions)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if newChoice != choice {
 			t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
 		}
@@ -21,13 +27,19 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Enc
 func TestRandomPartitioner(t *testing.T) {
 	partitioner := NewRandomPartitioner()
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice < 0 || choice >= 50 {
 			t.Error("Returned partition", choice, "outside of range.")
 		}
@@ -37,14 +49,20 @@ func TestRandomPartitioner(t *testing.T) {
 func TestRoundRobinPartitioner(t *testing.T) {
 	partitioner := RoundRobinPartitioner{}
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	var i int32
 	for i = 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 7)
+		choice, err := partitioner.Partition(nil, 7)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice != i%7 {
 			t.Error("Returned partition", choice, "expecting", i%7)
 		}
@@ -54,13 +72,19 @@ func TestRoundRobinPartitioner(t *testing.T) {
 func TestHashPartitioner(t *testing.T) {
 	partitioner := NewHashPartitioner()
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice < 0 || choice >= 50 {
 			t.Error("Returned partition", choice, "outside of range for nil key.")
 		}
@@ -74,10 +98,14 @@ func TestHashPartitioner(t *testing.T) {
 }
 
 func TestConstantPartitioner(t *testing.T) {
-	partitioner := &ConstantPartitioner{Constant: 0}
+	var partitioner Partitioner
+	partitioner = &ConstantPartitioner{Constant: 0}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice != 0 {
 			t.Error("Returned partition", choice, "instead of 0.")
 		}

+ 13 - 3
producer.go

@@ -617,7 +617,15 @@ func (p *Producer) retryHandler() {
 // utility functions
 
 func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
-	partitions, err := p.client.Partitions(msg.Topic)
+	var partitions []int32
+	var err error
+
+	if partitioner.RequiresConsistency() {
+		partitions, err = p.client.Partitions(msg.Topic)
+	} else {
+		partitions, err = p.client.WritablePartitions(msg.Topic)
+	}
+
 	if err != nil {
 		return err
 	}
@@ -628,9 +636,11 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 		return LeaderNotAvailable
 	}
 
-	choice := partitioner.Partition(msg.Key, numPartitions)
+	choice, err := partitioner.Partition(msg.Key, numPartitions)
 
-	if choice < 0 || choice >= numPartitions {
+	if err != nil {
+		return err
+	} else if choice < 0 || choice >= numPartitions {
 		return InvalidPartition
 	}