package sarama import ( "crypto/rand" "testing" ) func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) { choice := partitioner.Partition(key, numPartitions) if choice < 0 || choice >= numPartitions { t.Error(partitioner, "returned partition", choice, "outside of range for", key) } for i := 1; i < 50; i++ { newChoice := partitioner.Partition(key, numPartitions) if newChoice != choice { t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".") } } } func TestRandomPartitioner(t *testing.T) { partitioner := NewRandomPartitioner() choice := partitioner.Partition(nil, 1) if choice != 0 { t.Error("Returned non-zero partition when only one available.") } for i := 1; i < 50; i++ { choice := partitioner.Partition(nil, 50) if choice < 0 || choice >= 50 { t.Error("Returned partition", choice, "outside of range.") } } } func TestRoundRobinPartitioner(t *testing.T) { partitioner := RoundRobinPartitioner{} choice := partitioner.Partition(nil, 1) if choice != 0 { t.Error("Returned non-zero partition when only one available.") } var i int32 for i = 1; i < 50; i++ { choice := partitioner.Partition(nil, 7) if choice != i%7 { t.Error("Returned partition", choice, "expecting", i%7) } } } func TestHashPartitioner(t *testing.T) { partitioner := NewHashPartitioner() choice := partitioner.Partition(nil, 1) if choice != 0 { t.Error("Returned non-zero partition when only one available.") } for i := 1; i < 50; i++ { choice := partitioner.Partition(nil, 50) if choice < 0 || choice >= 50 { t.Error("Returned partition", choice, "outside of range for nil key.") } } buf := make([]byte, 256) for i := 1; i < 50; i++ { rand.Read(buf) assertPartitioningConsistent(t, partitioner, ByteEncoder(buf), 50) } }