Jelajahi Sumber

Make Partitioner.RequiresConsistency vary per-message

One of the useful features of the random partitioner is that it will
only partition messages onto partitions that are currently online. In
theory, hash partitioner is capable of exhibiting the same behavior for
messages where the key is unset, but it can't because
RequiresConsistency is a constant for the partitioner. It's also not
feasible to add methods to the Partitioner interface, since that's part
of the public API.

Therefore, this introduces a new interface, ExtendedPartitioner, which
includes an additional method: MessageRequiresConsistency. This lets the
partitioner make a per-message determination for whether consistency is
required (and therefore whether it's acceptable to publish to partitions
which are offline). The producer library will use this method if it's
available, and fall back on the existing RequiresConsistency method
otherwise.

Since hash partitioners are the only ones which have different
consistency requirements per-message, for now they are the only
partitioner to implement MessageRequiresConsistency.
Evan Broder 6 tahun lalu
induk
melakukan
9b2c4d6e2c
3 mengubah file dengan 45 tambahan dan 1 penghapusan
  1. 8 1
      async_producer.go
  2. 19 0
      partitioner.go
  3. 18 0
      partitioner_test.go

+ 8 - 1
async_producer.go

@@ -343,7 +343,14 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
 	var partitions []int32
 
 	err := tp.breaker.Run(func() (err error) {
-		if tp.partitioner.RequiresConsistency() {
+		var requiresConsistency = false
+		if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
+			requiresConsistency = ep.MessageRequiresConsistency(msg)
+		} else {
+			requiresConsistency = tp.partitioner.RequiresConsistency()
+		}
+
+		if requiresConsistency {
 			partitions, err = tp.parent.client.Partitions(msg.Topic)
 		} else {
 			partitions, err = tp.parent.client.WritablePartitions(msg.Topic)

+ 19 - 0
partitioner.go

@@ -22,6 +22,21 @@ type Partitioner interface {
 	RequiresConsistency() bool
 }
 
+// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
+// in order to allow more flexibility than is originally allowed by the
+// RequiresConsistency method in the Partitioner interface. This allows
+// partitioners to require consistency sometimes, but not all times. It's useful
+// for, e.g., the HashPartitioner, which does not require consistency if the
+// message key is nil.
+type DynamicConsistencyPartitioner interface {
+	Partitioner
+
+	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
+	// but takes in the message being partitioned so that the partitioner can
+	// make a per-message determination.
+	MessageRequiresConsistency(message *ProducerMessage) bool
+}
+
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
 type PartitionerConstructor func(topic string) Partitioner
 
@@ -133,3 +148,7 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
 func (p *hashPartitioner) RequiresConsistency() bool {
 	return true
 }
+
+func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
+	return message.Key != nil
+}

+ 18 - 0
partitioner_test.go

@@ -150,6 +150,24 @@ func TestHashPartitioner(t *testing.T) {
 	}
 }
 
+func TestHashPartitionerConsistency(t *testing.T) {
+	partitioner := NewHashPartitioner("mytopic")
+	ep, ok := partitioner.(DynamicConsistencyPartitioner)
+
+	if !ok {
+		t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner")
+	}
+
+	consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")})
+	if !consistency {
+		t.Error("Messages with keys should require consistency")
+	}
+	consistency = ep.MessageRequiresConsistency(&ProducerMessage{})
+	if consistency {
+		t.Error("Messages without keys should require consistency")
+	}
+}
+
 func TestHashPartitionerMinInt32(t *testing.T) {
 	partitioner := NewHashPartitioner("mytopic")