consumer_group_test.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package sarama
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. type exampleConsumerGroupHandler struct{}
  7. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
  8. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
  9. func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
  10. for msg := range claim.Messages() {
  11. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
  12. sess.MarkMessage(msg, "")
  13. }
  14. return nil
  15. }
  16. func ExampleConsumerGroup() {
  17. config := NewConfig()
  18. config.Version = V2_0_0_0 // specify appropriate version
  19. config.Consumer.Return.Errors = true
  20. group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
  21. if err != nil {
  22. panic(err)
  23. }
  24. defer func() { _ = group.Close() }()
  25. // Track errors
  26. go func() {
  27. for err := range group.Errors() {
  28. fmt.Println("ERROR", err)
  29. }
  30. }()
  31. // Iterate over consumer sessions.
  32. ctx := context.Background()
  33. for {
  34. topics := []string{"my-topic"}
  35. handler := exampleConsumerGroupHandler{}
  36. // `Consume` should be called inside an infinite loop, when a
  37. // server-side rebalance happens, the consumer session will need to be
  38. // recreated to get the new claims
  39. err := group.Consume(ctx, topics, handler)
  40. if err != nil {
  41. panic(err)
  42. }
  43. }
  44. }