consumer_group_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package sarama
  2. import (
  3. "fmt"
  4. )
  5. type exampleConsumerGroupHandler func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error
  6. func (h exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
  7. func (h exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
  8. func (h exampleConsumerGroupHandler) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error {
  9. return h(s, c)
  10. }
  11. func ExampleConsumerGroup() {
  12. // Init config, specify version
  13. config := NewConfig()
  14. config.Version = V1_0_0_0
  15. config.Consumer.Return.Errors = true
  16. // Start with a client
  17. client, err := NewClient([]string{"localhost:9092"}, nil)
  18. if err != nil {
  19. panic(err)
  20. }
  21. defer func() { _ = client.Close() }()
  22. // Start a new consumer group
  23. group, err := NewConsumerGroupFromClient("my-group", client)
  24. if err != nil {
  25. panic(err)
  26. }
  27. defer func() { _ = group.Close() }()
  28. // Track errors
  29. go func() {
  30. for err := range group.Errors() {
  31. fmt.Println("ERROR", err)
  32. }
  33. }()
  34. // Iterate over consumer sessions.
  35. for {
  36. err := group.Consume([]string{"my-topic"}, exampleConsumerGroupHandler(func(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
  37. for msg := range claim.Messages() {
  38. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
  39. sess.MarkMessage(msg, "")
  40. }
  41. return nil
  42. }))
  43. if err != nil {
  44. panic(err)
  45. }
  46. }
  47. }