Jelajahi Sumber

Merge pull request #1112 from evan-stripe/extended-partitioner

Make Partitioner.RequiresConsistency vary per-message
Evan Huus 7 tahun lalu
induk
melakukan
bc4e2742d5
3 mengubah file dengan 45 tambahan dan 1 penghapusan
  1. 8 1
      async_producer.go
  2. 19 0
      partitioner.go
  3. 18 0
      partitioner_test.go

+ 8 - 1
async_producer.go

@@ -344,7 +344,14 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
 	var partitions []int32
 
 	err := tp.breaker.Run(func() (err error) {
-		if tp.partitioner.RequiresConsistency() {
+		var requiresConsistency = false
+		if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
+			requiresConsistency = ep.MessageRequiresConsistency(msg)
+		} else {
+			requiresConsistency = tp.partitioner.RequiresConsistency()
+		}
+
+		if requiresConsistency {
 			partitions, err = tp.parent.client.Partitions(msg.Topic)
 		} else {
 			partitions, err = tp.parent.client.WritablePartitions(msg.Topic)

+ 19 - 0
partitioner.go

@@ -22,6 +22,21 @@ type Partitioner interface {
 	RequiresConsistency() bool
 }
 
+// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
+// in order to allow more flexibility than is originally allowed by the
+// RequiresConsistency method in the Partitioner interface. This allows
+// partitioners to require consistency sometimes, but not all times. It's useful
+// for, e.g., the HashPartitioner, which does not require consistency if the
+// message key is nil.
+type DynamicConsistencyPartitioner interface {
+	Partitioner
+
+	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
+	// but takes in the message being partitioned so that the partitioner can
+	// make a per-message determination.
+	MessageRequiresConsistency(message *ProducerMessage) bool
+}
+
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
 type PartitionerConstructor func(topic string) Partitioner
 
@@ -133,3 +148,7 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
 func (p *hashPartitioner) RequiresConsistency() bool {
 	return true
 }
+
+func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
+	return message.Key != nil
+}

+ 18 - 0
partitioner_test.go

@@ -150,6 +150,24 @@ func TestHashPartitioner(t *testing.T) {
 	}
 }
 
+func TestHashPartitionerConsistency(t *testing.T) {
+	partitioner := NewHashPartitioner("mytopic")
+	ep, ok := partitioner.(DynamicConsistencyPartitioner)
+
+	if !ok {
+		t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner")
+	}
+
+	consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")})
+	if !consistency {
+		t.Error("Messages with keys should require consistency")
+	}
+	consistency = ep.MessageRequiresConsistency(&ProducerMessage{})
+	if consistency {
+		t.Error("Messages without keys should require consistency")
+	}
+}
+
 func TestHashPartitionerMinInt32(t *testing.T) {
 	partitioner := NewHashPartitioner("mytopic")