Browse Source

Fix shutdown and race-condition in consumer-group example

Scott Kidder 5 years ago
parent
commit
aea5ab960e
1 changed files with 22 additions and 8 deletions
  1. 22 8
      examples/consumergroup/main.go

+ 22 - 8
examples/consumergroup/main.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 
 	"github.com/Shopify/sarama"
@@ -70,21 +71,30 @@ func main() {
 	/**
 	 * Setup a new Sarama consumer group
 	 */
-	consumer := Consumer{}
+	consumer := Consumer{
+		ready: make(chan bool, 0),
+	}
 
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
 	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
 	if err != nil {
 		panic(err)
 	}
 
+	wg := &sync.WaitGroup{}
 	go func() {
+		wg.Add(1)
+		defer wg.Done()
 		for {
-			consumer.ready = make(chan bool, 0)
 			err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
 			if err != nil {
 				panic(err)
 			}
+			// check if context was cancelled, signaling that the consumer should stop
+			if ctx.Err() != nil {
+				return
+			}
+			consumer.ready = make(chan bool, 0)
 		}
 	}()
 
@@ -93,11 +103,15 @@ func main() {
 
 	sigterm := make(chan os.Signal, 1)
 	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
-
-	<-sigterm // Await a sigterm signal before safely closing the consumer
-
-	err = client.Close()
-	if err != nil {
+	select {
+	case <-ctx.Done():
+		log.Println("terminating: context cancelled")
+	case <-sigterm:
+		log.Println("terminating: via signal")
+	}
+	cancel()
+	wg.Wait()
+	if err = client.Close(); err != nil {
 		panic(err)
 	}
 }