12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- package sarama
- import (
- "context"
- "fmt"
- )
- type exampleConsumerGroupHandler struct{}
- func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
- func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
- func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
- sess.MarkMessage(msg, "")
- }
- return nil
- }
- func ExampleConsumerGroup() {
- // Init config, specify appropriate version
- config := NewConfig()
- config.Version = V1_0_0_0
- config.Consumer.Return.Errors = true
- // Start with a client
- client, err := NewClient([]string{"localhost:9092"}, config)
- if err != nil {
- panic(err)
- }
- defer func() { _ = client.Close() }()
- // Start a new consumer group
- group, err := NewConsumerGroupFromClient("my-group", client)
- if err != nil {
- panic(err)
- }
- defer func() { _ = group.Close() }()
- // Track errors
- go func() {
- for err := range group.Errors() {
- fmt.Println("ERROR", err)
- }
- }()
- // Iterate over consumer sessions.
- ctx := context.Background()
- for {
- topics := []string{"my-topic"}
- handler := exampleConsumerGroupHandler{}
- err := group.Consume(ctx, topics, handler)
- if err != nil {
- panic(err)
- }
- }
- }
|