Browse Source

Merge pull request #359 from Shopify/partitioner_examples

Add examples for random and manual partitioning
Willem van Bergen 10 years ago
parent
commit
a88b671e41
1 changed files with 76 additions and 0 deletions
  1. 76 0
      partitioner_test.go

+ 76 - 0
partitioner_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"crypto/rand"
+	"log"
 	"testing"
 )
 
@@ -120,3 +121,78 @@ func TestManualPartitioner(t *testing.T) {
 		}
 	}
 }
+
+// By default, Sarama uses the message's key to consistently assign a partition to
+// a message using hashing. If no key is set, a random partition will be chosen.
+// This example shows how you can partition messages randomly, even when a key is set,
+// by overriding Config.Producer.Partitioner.
+func ExamplePartitioner_random() {
+	config := NewConfig()
+	config.Producer.Partitioner = NewRandomPartitioner
+
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			log.Println("Failed to close producer:", err)
+		}
+	}()
+
+	msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
+	partition, offset, err := producer.SendMessage(msg)
+	if err != nil {
+		log.Fatalln("Failed to produce message to kafka cluster.")
+	}
+
+	log.Printf("Produced message to partition %d with offset %d", partition, offset)
+}
+
+// This example shows how to assign partitions to your messages manually.
+func ExamplePartitioner_manual() {
+	config := NewConfig()
+
+	// First, we tell the producer that we are going to partition ourselves.
+	config.Producer.Partitioner = NewManualPartitioner
+
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			log.Println("Failed to close producer:", err)
+		}
+	}()
+
+	// Now, we set the Partition field of the ProducerMessage struct.
+	msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
+
+	partition, offset, err := producer.SendMessage(msg)
+	if err != nil {
+		log.Fatalln("Failed to produce message to kafka cluster.")
+	}
+
+	if partition != 6 {
+		log.Fatal("Message should have been produced to partition 6!")
+	}
+
+	log.Printf("Produced message to partition %d with offset %d", partition, offset)
+}
+
+// This example shows how to set a different partitioner depending on the topic.
+func ExamplePartitioner_per_topic() {
+	config := NewConfig()
+	config.Producer.Partitioner = func(topic string) Partitioner {
+		switch topic {
+		case "access_log", "error_log":
+			return NewRandomPartitioner(topic)
+
+		default:
+			return NewHashPartitioner(topic)
+		}
+	}
+
+	// ...
+}