123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- package sarama
- import (
- "crypto/rand"
- "hash/fnv"
- "log"
- "testing"
- )
- func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) {
- choice, err := partitioner.Partition(message, numPartitions)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= numPartitions {
- t.Error(partitioner, "returned partition", choice, "outside of range for", message)
- }
- for i := 1; i < 50; i++ {
- newChoice, err := partitioner.Partition(message, numPartitions)
- if err != nil {
- t.Error(partitioner, err)
- }
- if newChoice != choice {
- t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
- }
- }
- }
- func TestRandomPartitioner(t *testing.T) {
- partitioner := NewRandomPartitioner("mytopic")
- choice, err := partitioner.Partition(nil, 1)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != 0 {
- t.Error("Returned non-zero partition when only one available.")
- }
- for i := 1; i < 50; i++ {
- choice, err := partitioner.Partition(nil, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= 50 {
- t.Error("Returned partition", choice, "outside of range.")
- }
- }
- }
- func TestRoundRobinPartitioner(t *testing.T) {
- partitioner := NewRoundRobinPartitioner("mytopic")
- choice, err := partitioner.Partition(nil, 1)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != 0 {
- t.Error("Returned non-zero partition when only one available.")
- }
- var i int32
- for i = 1; i < 50; i++ {
- choice, err := partitioner.Partition(nil, 7)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != i%7 {
- t.Error("Returned partition", choice, "expecting", i%7)
- }
- }
- }
- func TestNewHashPartitionerWithHasher(t *testing.T) {
- // use the current default hasher fnv.New32a()
- partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
- choice, err := partitioner.Partition(&ProducerMessage{}, 1)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != 0 {
- t.Error("Returned non-zero partition when only one available.")
- }
- for i := 1; i < 50; i++ {
- choice, err := partitioner.Partition(&ProducerMessage{}, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= 50 {
- t.Error("Returned partition", choice, "outside of range for nil key.")
- }
- }
- buf := make([]byte, 256)
- for i := 1; i < 50; i++ {
- if _, err := rand.Read(buf); err != nil {
- t.Error(err)
- }
- assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
- }
- }
- func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
- // use the current default hasher fnv.New32a()
- partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
- msg := ProducerMessage{}
- // "1468509572224" generates 2147483648 (uint32) result from Sum32 function
- // which is -2147483648 or int32's min value
- msg.Key = StringEncoder("1468509572224")
- choice, err := partitioner.Partition(&msg, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= 50 {
- t.Error("Returned partition", choice, "outside of range for nil key.")
- }
- }
- func TestHashPartitioner(t *testing.T) {
- partitioner := NewHashPartitioner("mytopic")
- choice, err := partitioner.Partition(&ProducerMessage{}, 1)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != 0 {
- t.Error("Returned non-zero partition when only one available.")
- }
- for i := 1; i < 50; i++ {
- choice, err := partitioner.Partition(&ProducerMessage{}, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= 50 {
- t.Error("Returned partition", choice, "outside of range for nil key.")
- }
- }
- buf := make([]byte, 256)
- for i := 1; i < 50; i++ {
- if _, err := rand.Read(buf); err != nil {
- t.Error(err)
- }
- assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
- }
- }
- func TestHashPartitionerConsistency(t *testing.T) {
- partitioner := NewHashPartitioner("mytopic")
- ep, ok := partitioner.(DynamicConsistencyPartitioner)
- if !ok {
- t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner")
- }
- consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")})
- if !consistency {
- t.Error("Messages with keys should require consistency")
- }
- consistency = ep.MessageRequiresConsistency(&ProducerMessage{})
- if consistency {
- t.Error("Messages without keys should require consistency")
- }
- }
- func TestHashPartitionerMinInt32(t *testing.T) {
- partitioner := NewHashPartitioner("mytopic")
- msg := ProducerMessage{}
- // "1468509572224" generates 2147483648 (uint32) result from Sum32 function
- // which is -2147483648 or int32's min value
- msg.Key = StringEncoder("1468509572224")
- choice, err := partitioner.Partition(&msg, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice < 0 || choice >= 50 {
- t.Error("Returned partition", choice, "outside of range for nil key.")
- }
- }
- func TestManualPartitioner(t *testing.T) {
- partitioner := NewManualPartitioner("mytopic")
- choice, err := partitioner.Partition(&ProducerMessage{}, 1)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != 0 {
- t.Error("Returned non-zero partition when only one available.")
- }
- for i := int32(1); i < 50; i++ {
- choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50)
- if err != nil {
- t.Error(partitioner, err)
- }
- if choice != i {
- t.Error("Returned partition not the same as the input partition")
- }
- }
- }
- // By default, Sarama uses the message's key to consistently assign a partition to
- // a message using hashing. If no key is set, a random partition will be chosen.
- // This example shows how you can partition messages randomly, even when a key is set,
- // by overriding Config.Producer.Partitioner.
- func ExamplePartitioner_random() {
- config := NewConfig()
- config.Producer.Partitioner = NewRandomPartitioner
- producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
- if err != nil {
- log.Fatal(err)
- }
- defer func() {
- if err := producer.Close(); err != nil {
- log.Println("Failed to close producer:", err)
- }
- }()
- msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
- partition, offset, err := producer.SendMessage(msg)
- if err != nil {
- log.Fatalln("Failed to produce message to kafka cluster.")
- }
- log.Printf("Produced message to partition %d with offset %d", partition, offset)
- }
- // This example shows how to assign partitions to your messages manually.
- func ExamplePartitioner_manual() {
- config := NewConfig()
- // First, we tell the producer that we are going to partition ourselves.
- config.Producer.Partitioner = NewManualPartitioner
- producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
- if err != nil {
- log.Fatal(err)
- }
- defer func() {
- if err := producer.Close(); err != nil {
- log.Println("Failed to close producer:", err)
- }
- }()
- // Now, we set the Partition field of the ProducerMessage struct.
- msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
- partition, offset, err := producer.SendMessage(msg)
- if err != nil {
- log.Fatalln("Failed to produce message to kafka cluster.")
- }
- if partition != 6 {
- log.Fatal("Message should have been produced to partition 6!")
- }
- log.Printf("Produced message to partition %d with offset %d", partition, offset)
- }
- // This example shows how to set a different partitioner depending on the topic.
- func ExamplePartitioner_per_topic() {
- config := NewConfig()
- config.Producer.Partitioner = func(topic string) Partitioner {
- switch topic {
- case "access_log", "error_log":
- return NewRandomPartitioner(topic)
- default:
- return NewHashPartitioner(topic)
- }
- }
- // ...
- }
|