Parcourir la source

Merge pull request #326 from Shopify/producer_example

Add an example for using the async producer with goroutines
Willem van Bergen il y a 10 ans
Parent
commit
14bae79f1b
1 fichiers modifiés avec 76 ajouts et 6 suppressions
  1. 76 6
      producer_test.go

+ 76 - 6
producer_test.go

@@ -1,7 +1,9 @@
 package sarama
 
 import (
-	"fmt"
+	"log"
+	"os"
+	"os/signal"
 	"sync"
 	"testing"
 )
@@ -507,25 +509,93 @@ func TestProducerMultipleRetries(t *testing.T) {
 	closeProducer(t, producer)
 }
 
-func ExampleProducer() {
+// This example shows how to use the producer while simultaneously
+// reading the Errors channel to know about any failures.
+func ExampleProducer_select() {
 	producer, err := NewProducer([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	}
+
 	defer func() {
 		if err := producer.Close(); err != nil {
-			panic(err)
+			log.Fatalln(err)
 		}
 	}()
 
+	// Trap SIGINT to trigger a shutdown.
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+
+	var enqueued, errors int
+ProducerLoop:
 	for {
 		select {
 		case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
-			fmt.Println("> message queued")
+			enqueued++
 		case err := <-producer.Errors():
-			panic(err.Err)
+			log.Println("Failed to produce message", err)
+			errors++
+		case <-signals:
+			break ProducerLoop
+		}
+	}
+
+	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
+}
+
+// This example shows how to use the producer with separate goroutines
+// reading from the Successes and Errors channels.
+func ExampleProducer_goroutines() {
+	config := NewConfig()
+	config.Producer.AckSuccesses = true
+	producer, err := NewProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		panic(err)
+	}
+
+	// Trap SIGINT to trigger a graceful shutdown.
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+
+	var (
+		wg                          sync.WaitGroup
+		enqueued, successes, errors int
+	)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for _ = range producer.Successes() {
+			successes++
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for err := range producer.Errors() {
+			log.Println(err)
+			errors++
+		}
+	}()
+
+ProducerLoop:
+	for {
+		message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
+		select {
+		case producer.Input() <- message:
+			enqueued++
+
+		case <-signals:
+			producer.AsyncClose() // Trigger a shutdown of the producer.
+			break ProducerLoop
 		}
 	}
+
+	wg.Wait()
+
+	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
 }
 
 func ExampleSyncProducer() {
@@ -544,7 +614,7 @@ func ExampleSyncProducer() {
 		if err != nil {
 			panic(err)
 		} else {
-			fmt.Printf("> message sent to partition %d at offset %d\n", partition, offset)
+			log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
 		}
 	}
 }