consumer_group_test.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package sarama
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. type exampleConsumerGroupHandler func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error
  7. func (h exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
  8. func (h exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
  9. func (h exampleConsumerGroupHandler) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error {
  10. return h(s, c)
  11. }
  12. func ExampleConsumerGroup() {
  13. // Init config, specify version
  14. config := NewConfig()
  15. config.Version = V1_0_0_0
  16. config.Consumer.Return.Errors = true
  17. // Start with a client
  18. client, err := NewClient([]string{"localhost:9092"}, nil)
  19. if err != nil {
  20. panic(err)
  21. }
  22. defer func() { _ = client.Close() }()
  23. // Start a new consumer group
  24. group, err := NewConsumerGroupFromClient("my-group", client)
  25. if err != nil {
  26. panic(err)
  27. }
  28. defer func() { _ = group.Close() }()
  29. // Track errors
  30. go func() {
  31. for err := range group.Errors() {
  32. fmt.Println("ERROR", err)
  33. }
  34. }()
  35. // Iterate over consumer sessions.
  36. ctx := context.Background()
  37. for {
  38. err := group.Consume(ctx, []string{"my-topic"}, exampleConsumerGroupHandler(func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
  39. for msg := range claim.Messages() {
  40. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
  41. sess.MarkMessage(msg, "")
  42. }
  43. return nil
  44. }))
  45. if err != nil {
  46. panic(err)
  47. }
  48. }
  49. }