consumer_group_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package sarama
  2. import (
  3. "context"
  4. "fmt"
  5. )
  6. type exampleConsumerGroupHandler struct{}
  7. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
  8. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
  9. func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
  10. for msg := range claim.Messages() {
  11. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
  12. sess.MarkMessage(msg, "")
  13. }
  14. return nil
  15. }
  16. func ExampleConsumerGroup() {
  17. // Init config, specify appropriate version
  18. config := NewConfig()
  19. config.Version = V1_0_0_0
  20. config.Consumer.Return.Errors = true
  21. // Start with a client
  22. client, err := NewClient([]string{"localhost:9092"}, config)
  23. if err != nil {
  24. panic(err)
  25. }
  26. defer func() { _ = client.Close() }()
  27. // Start a new consumer group
  28. group, err := NewConsumerGroupFromClient("my-group", client)
  29. if err != nil {
  30. panic(err)
  31. }
  32. defer func() { _ = group.Close() }()
  33. // Track errors
  34. go func() {
  35. for err := range group.Errors() {
  36. fmt.Println("ERROR", err)
  37. }
  38. }()
  39. // Iterate over consumer sessions.
  40. ctx := context.Background()
  41. for {
  42. topics := []string{"my-topic"}
  43. handler := exampleConsumerGroupHandler{}
  44. err := group.Consume(ctx, topics, handler)
  45. if err != nil {
  46. panic(err)
  47. }
  48. }
  49. }