Forráskód Böngészése

Add ManualPartitioner

Willem van Bergen 10 éve
szülő
commit
4a1de384e7
2 módosított fájl, 38 hozzáadás és 0 törlés
  1. 16 0
      partitioner.go
  2. 22 0
      partitioner_test.go

+ 16 - 0
partitioner.go

@@ -22,6 +22,22 @@ type Partitioner interface {
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
 type PartitionerConstructor func() Partitioner
 
+type manualPartitioner struct{}
+
+// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
+// ProducerMessage's Partition field as the partition to produce to.
+func NewManualPartitioner() Partitioner {
+	return new(manualPartitioner)
+}
+
+func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
+	return message.Partition, nil
+}
+
+func (p *manualPartitioner) RequiresConsistency() bool {
+	return true
+}
+
 type randomPartitioner struct {
 	generator *rand.Rand
 }

+ 22 - 0
partitioner_test.go

@@ -98,3 +98,25 @@ func TestHashPartitioner(t *testing.T) {
 		assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
 	}
 }
+
+func TestManualPartitioner(t *testing.T) {
+	partitioner := NewManualPartitioner()
+
+	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
+	if choice != 0 {
+		t.Error("Returned non-zero partition when only one available.")
+	}
+
+	for i := int32(1); i < 50; i++ {
+		choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
+		if choice != i {
+			t.Error("Returned partition not the same as the input partition")
+		}
+	}
+}