Browse Source

Merge pull request #1404 from skidder/sk-fix-shutdown-for-consumer-group-example

Fix shutdown and race-condition in consumer-group example
Vlad Gorodetsky 5 years ago
parent
commit
4154a59639
1 changed files with 27 additions and 14 deletions
  1. 27 14
      examples/consumergroup/main.go

+ 27 - 14
examples/consumergroup/main.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 
 	"github.com/Shopify/sarama"
@@ -53,7 +54,7 @@ func main() {
 
 	version, err := sarama.ParseKafkaVersion(version)
 	if err != nil {
-		panic(err)
+		log.Panicf("Error parsing Kafka version: %v", err)
 	}
 
 	/**
@@ -70,21 +71,29 @@ func main() {
 	/**
 	 * Setup a new Sarama consumer group
 	 */
-	consumer := Consumer{}
+	consumer := Consumer{
+		ready: make(chan bool, 0),
+	}
 
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
 	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
 	if err != nil {
-		panic(err)
+		log.Panicf("Error creating consumer group client: %v", err)
 	}
 
+	wg := &sync.WaitGroup{}
 	go func() {
+		wg.Add(1)
+		defer wg.Done()
 		for {
-			consumer.ready = make(chan bool, 0)
-			err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
-			if err != nil {
-				panic(err)
+			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
+				log.Panicf("Error from consumer: %v", err)
 			}
+			// check if context was cancelled, signaling that the consumer should stop
+			if ctx.Err() != nil {
+				return
+			}
+			consumer.ready = make(chan bool, 0)
 		}
 	}()
 
@@ -93,12 +102,16 @@ func main() {
 
 	sigterm := make(chan os.Signal, 1)
 	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
-
-	<-sigterm // Await a sigterm signal before safely closing the consumer
-
-	err = client.Close()
-	if err != nil {
-		panic(err)
+	select {
+	case <-ctx.Done():
+		log.Println("terminating: context cancelled")
+	case <-sigterm:
+		log.Println("terminating: via signal")
+	}
+	cancel()
+	wg.Wait()
+	if err = client.Close(); err != nil {
+		log.Panicf("Error closing client: %v", err)
 	}
 }