Sfoglia il codice sorgente

Add examples for random and manual partitioning.

Willem van Bergen 10 anni fa
parent
commit
f250575968
1 ha cambiato i file con 60 aggiunte e 0 eliminazioni
  1. 60 0
      partitioner_test.go

+ 60 - 0
partitioner_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"crypto/rand"
+	"log"
 	"testing"
 )
 
@@ -120,3 +121,62 @@ func TestManualPartitioner(t *testing.T) {
 		}
 	}
 }
+
+// By default, Sarama uses the message's key to consistently assign a partition to
+// a message using hashing. If no key is set, a random partition will be chosen.
+// This example shows how you can partition messages randomly, even when a key is set,
+// by overriding Config.Producer.Partitioner.
+func ExamplePartitioner_random() {
+	config := NewConfig()
+	config.Producer.Partitioner = NewRandomPartitioner
+
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			log.Println("Failed to close producer:", err)
+		}
+	}()
+
+	msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
+	partition, offset, err := producer.SendMessage(msg)
+	if err != nil {
+		log.Fatalln("Failed to produce message to kafka cluster.")
+	}
+
+	log.Printf("Produced message to partition %d with offset %d", partition, offset)
+}
+
+// This example shows how to assign partitions to your messages manually.
+func ExamplePartitioner_manual() {
+	config := NewConfig()
+
+	// First, we tell the producer that we are going to partition ourselves.
+	config.Producer.Partitioner = NewManualPartitioner
+
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			log.Println("Failed to close producer:", err)
+		}
+	}()
+
+	// Now, we set the Partition field of the ProducerMessage struct.
+	msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
+
+	partition, offset, err := producer.SendMessage(msg)
+	if err != nil {
+		log.Fatalln("Failed to produce message to kafka cluster.")
+	}
+
+	if partition != 6 {
+		log.Fatal("Message should have been produced to partition 6!")
+	}
+
+	log.Printf("Produced message to partition %d with offset %d", partition, offset)
+}