Bladeren bron

adds a note about consumer groups Consume method

Diego Alvarez 4 jaren geleden
bovenliggende
commit
8b72385c90
3 gewijzigde bestanden met toevoegingen van 11 en 11 verwijderingen
  1. 3 0
      consumer_group.go
  2. 5 11
      consumer_group_test.go
  3. 3 0
      examples/consumergroup/main.go

+ 3 - 0
consumer_group.go

@@ -38,6 +38,9 @@ type ConsumerGroup interface {
 	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
 	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
 	// commit failures.
+	// This method should be called inside an infinite loop, when a
+	// server-side rebalance happens, the consumer session will need to be
+	// recreated to get the new claims.
 	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
 
 	// Errors returns a read channel of errors that occurred during the consumer life-cycle.

+ 5 - 11
consumer_group_test.go

@@ -18,20 +18,11 @@ func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, cla
 }
 
 func ExampleConsumerGroup() {
-	// Init config, specify appropriate version
 	config := NewConfig()
-	config.Version = V1_0_0_0
+	config.Version = V2_0_0_0 // specify appropriate version
 	config.Consumer.Return.Errors = true
 
-	// Start with a client
-	client, err := NewClient([]string{"localhost:9092"}, config)
-	if err != nil {
-		panic(err)
-	}
-	defer func() { _ = client.Close() }()
-
-	// Start a new consumer group
-	group, err := NewConsumerGroupFromClient("my-group", client)
+	group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
 	if err != nil {
 		panic(err)
 	}
@@ -50,6 +41,9 @@ func ExampleConsumerGroup() {
 		topics := []string{"my-topic"}
 		handler := exampleConsumerGroupHandler{}
 
+		// `Consume` should be called inside an infinite loop, when a
+		// server-side rebalance happens, the consumer session will need to be
+		// recreated to get the new claims
 		err := group.Consume(ctx, topics, handler)
 		if err != nil {
 			panic(err)

+ 3 - 0
examples/consumergroup/main.go

@@ -99,6 +99,9 @@ func main() {
 	go func() {
 		defer wg.Done()
 		for {
+			// `Consume` should be called inside an infinite loop, when a
+			// server-side rebalance happens, the consumer session will need to be
+			// recreated to get the new claims
 			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
 				log.Panicf("Error from consumer: %v", err)
 			}