Quellcode durchsuchen

Add functionality for custom hash partitioner.

The wrapper function NewCustomHashPartitioner is a wrapper around the PartitionerConstructor, allowing for specifying the hash algorithm, which will be used.
Lyuben Manolov vor 8 Jahren
Ursprung
Commit
cd5b0ea54d
2 geänderte Dateien mit 61 neuen und 0 gelöschten Zeilen
  1. 11 0
      partitioner.go
  2. 50 0
      partitioner_test.go

+ 11 - 0
partitioner.go

@@ -87,6 +87,17 @@ type hashPartitioner struct {
 	hasher hash.Hash32
 }
 
+// NewCustomHashPartitioner is a wrapper around NewHashPartitioner,
+// allowing the use of custom hasher
+func NewCustomHashPartitioner(hasher hash.Hash32) PartitionerConstructor {
+	return func(topic string) Partitioner {
+		p := new(hashPartitioner)
+		p.random = NewRandomPartitioner(topic)
+		p.hasher = hasher
+		return p
+	}
+}
+
 // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
 // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
 // modulus the number of partitions. This ensures that messages with the same key always end up on the

+ 50 - 0
partitioner_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"crypto/rand"
+	"hash/fnv"
 	"log"
 	"testing"
 )
@@ -70,6 +71,55 @@ func TestRoundRobinPartitioner(t *testing.T) {
 	}
 }
 
+func TestNewHashPartitionerWithHasher(t *testing.T) {
+	// use the current default hasher fnv.New32a()
+	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+
+	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
+	if choice != 0 {
+		t.Error("Returned non-zero partition when only one available.")
+	}
+
+	for i := 1; i < 50; i++ {
+		choice, err := partitioner.Partition(&ProducerMessage{}, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
+		if choice < 0 || choice >= 50 {
+			t.Error("Returned partition", choice, "outside of range for nil key.")
+		}
+	}
+
+	buf := make([]byte, 256)
+	for i := 1; i < 50; i++ {
+		if _, err := rand.Read(buf); err != nil {
+			t.Error(err)
+		}
+		assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
+	}
+}
+
+func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
+	// use the current default hasher fnv.New32a()
+	partitioner := NewCustomHashPartitioner(fnv.New32a())("mytopic")
+
+	msg := ProducerMessage{}
+	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
+	// which is -2147483648 or int32's min value
+	msg.Key = StringEncoder("1468509572224")
+
+	choice, err := partitioner.Partition(&msg, 50)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
+	if choice < 0 || choice >= 50 {
+		t.Error("Returned partition", choice, "outside of range for nil key.")
+	}
+}
+
 func TestHashPartitioner(t *testing.T) {
 	partitioner := NewHashPartitioner("mytopic")