package sarama import ( "context" "errors" "fmt" "sort" "sync" "time" ) // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") // ConsumerGroup is responsible for dividing up processing of topics and partitions // over a collection of processes (the members of the consumer group). type ConsumerGroup interface { // Consume joins a cluster of consumers for a given list of topics and // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler. // // The life-cycle of a session is represented by the following steps: // // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) // and is assigned their "fair share" of partitions, aka 'claims'. // 2. Before processing starts, the handler's Setup() hook is called to notify the user // of the claims and allow any necessary preparation or alteration of state. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected // from concurrent reads/writes. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the // parent context is cancelled or when a server-side rebalance cycle is initiated. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called // to allow the user to perform any final tasks before a rebalance. // 6. Finally, marked offsets are committed one last time before claims are released. // // Please note, that once a rebalance is triggered, sessions must be completed within // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset // commit failures. // This method should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error // Errors returns a read channel of errors that occurred during the consumer life-cycle. // By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. Errors() <-chan error // Close stops the ConsumerGroup and detaches any running sessions. It is required to call // this function before the object passes out of scope, as it will otherwise leak memory. Close() error } type consumerGroup struct { client Client config *Config consumer Consumer groupID string memberID string errors chan error lock sync.Mutex closed chan none closeOnce sync.Once userData []byte } // NewConsumerGroup creates a new consumer group the given broker addresses and configuration. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) { client, err := NewClient(addrs, config) if err != nil { return nil, err } c, err := newConsumerGroup(groupID, client) if err != nil { _ = client.Close() } return c, err } // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still // necessary to call Close() on the underlying client when shutting down this consumer. // PLEASE NOTE: consumer groups can only re-use but not share clients. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) { // For clients passed in by the client, ensure we don't // call Close() on it. cli := &nopCloserClient{client} return newConsumerGroup(groupID, cli) } func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { config := client.Config() if !config.Version.IsAtLeast(V0_10_2_0) { return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0") } consumer, err := NewConsumerFromClient(client) if err != nil { return nil, err } return &consumerGroup{ client: client, consumer: consumer, config: config, groupID: groupID, errors: make(chan error, config.ChannelBufferSize), closed: make(chan none), }, nil } // Errors implements ConsumerGroup. func (c *consumerGroup) Errors() <-chan error { return c.errors } // Close implements ConsumerGroup. func (c *consumerGroup) Close() (err error) { c.closeOnce.Do(func() { close(c.closed) // leave group if e := c.leave(); e != nil { err = e } // drain errors go func() { close(c.errors) }() for e := range c.errors { err = e } if e := c.client.Close(); e != nil { err = e } }) return } // Consume implements ConsumerGroup. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error { // Ensure group is not closed select { case <-c.closed: return ErrClosedConsumerGroup default: } c.lock.Lock() defer c.lock.Unlock() // Quick exit when no topics are provided if len(topics) == 0 { return fmt.Errorf("no topics provided") } // Refresh metadata for requested topics if err := c.client.RefreshMetadata(topics...); err != nil { return err } // Init session sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max) if err == ErrClosedClient { return ErrClosedConsumerGroup } else if err != nil { return err } // loop check topic partition numbers changed // will trigger rebalance when any topic partitions number had changed // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine go c.loopCheckPartitionNumbers(topics, sess) // Wait for session exit signal <-sess.ctx.Done() // Gracefully release session claims return sess.release(true) } func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) { select { case <-c.closed: return nil, ErrClosedConsumerGroup case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): } if refreshCoordinator { err := c.client.RefreshCoordinator(c.groupID) if err != nil { return c.retryNewSession(ctx, topics, handler, retries, true) } } return c.newSession(ctx, topics, handler, retries-1) } func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { coordinator, err := c.client.Coordinator(c.groupID) if err != nil { if retries <= 0 { return nil, err } return c.retryNewSession(ctx, topics, handler, retries, true) } // Join consumer group join, err := c.joinGroupRequest(coordinator, topics) if err != nil { _ = coordinator.Close() return nil, err } switch join.Err { case ErrNoError: c.memberID = join.MemberId case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" return c.newSession(ctx, topics, handler, retries) case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { return nil, join.Err } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrRebalanceInProgress: // retry after backoff if retries <= 0 { return nil, join.Err } return c.retryNewSession(ctx, topics, handler, retries, false) default: return nil, join.Err } // Prepare distribution plan if we joined as the leader var plan BalanceStrategyPlan if join.LeaderId == join.MemberId { members, err := join.GetMembers() if err != nil { return nil, err } plan, err = c.balance(members) if err != nil { return nil, err } } // Sync consumer group sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) if err != nil { _ = coordinator.Close() return nil, err } switch sync.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" return c.newSession(ctx, topics, handler, retries) case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { return nil, sync.Err } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrRebalanceInProgress: // retry after backoff if retries <= 0 { return nil, sync.Err } return c.retryNewSession(ctx, topics, handler, retries, false) default: return nil, sync.Err } // Retrieve and sort claims var claims map[string][]int32 if len(sync.MemberAssignment) > 0 { members, err := sync.GetMemberAssignment() if err != nil { return nil, err } claims = members.Topics c.userData = members.UserData for _, partitions := range claims { sort.Sort(int32Slice(partitions)) } } return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler) } func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) { req := &JoinGroupRequest{ GroupId: c.groupID, MemberId: c.memberID, SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond), ProtocolType: "consumer", } if c.config.Version.IsAtLeast(V0_10_1_0) { req.Version = 1 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond) } // use static user-data if configured, otherwise use consumer-group userdata from the last sync userData := c.config.Consumer.Group.Member.UserData if len(userData) == 0 { userData = c.userData } meta := &ConsumerGroupMemberMetadata{ Topics: topics, UserData: userData, } strategy := c.config.Consumer.Group.Rebalance.Strategy if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { return nil, err } return coordinator.JoinGroup(req) } func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) { req := &SyncGroupRequest{ GroupId: c.groupID, MemberId: c.memberID, GenerationId: generationID, } strategy := c.config.Consumer.Group.Rebalance.Strategy for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) if err != nil { return nil, err } assignment.UserData = userDataBytes if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil { return nil, err } } return coordinator.SyncGroup(req) } func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) { req := &HeartbeatRequest{ GroupId: c.groupID, MemberId: memberID, GenerationId: generationID, } return coordinator.Heartbeat(req) } func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { topics := make(map[string][]int32) for _, meta := range members { for _, topic := range meta.Topics { topics[topic] = nil } } for topic := range topics { partitions, err := c.client.Partitions(topic) if err != nil { return nil, err } topics[topic] = partitions } strategy := c.config.Consumer.Group.Rebalance.Strategy return strategy.Plan(members, topics) } // Leaves the cluster, called by Close. func (c *consumerGroup) leave() error { c.lock.Lock() defer c.lock.Unlock() if c.memberID == "" { return nil } coordinator, err := c.client.Coordinator(c.groupID) if err != nil { return err } resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ GroupId: c.groupID, MemberId: c.memberID, }) if err != nil { _ = coordinator.Close() return err } // Unset memberID c.memberID = "" // Check response switch resp.Err { case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: return nil default: return resp.Err } } func (c *consumerGroup) handleError(err error, topic string, partition int32) { if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { err = &ConsumerError{ Topic: topic, Partition: partition, Err: err, } } if !c.config.Consumer.Return.Errors { Logger.Println(err) return } select { case <-c.closed: //consumer is closed return default: } select { case c.errors <- err: default: // no error listener } } func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) { pause := time.NewTicker(c.config.Metadata.RefreshFrequency) defer session.cancel() defer pause.Stop() var oldTopicToPartitionNum map[string]int var err error if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil { return } for { if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil { return } else { for topic, num := range oldTopicToPartitionNum { if newTopicToPartitionNum[topic] != num { return // trigger the end of the session on exit } } } select { case <-pause.C: case <-session.ctx.Done(): Logger.Printf("loop check partition number coroutine will exit, topics %s", topics) // if session closed by other, should be exited return case <-c.closed: return } } } func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) { topicToPartitionNum := make(map[string]int, len(topics)) for _, topic := range topics { if partitionNum, err := c.client.Partitions(topic); err != nil { Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err) return nil, err } else { topicToPartitionNum[topic] = len(partitionNum) } } return topicToPartitionNum, nil } // -------------------------------------------------------------------- // ConsumerGroupSession represents a consumer group member session. type ConsumerGroupSession interface { // Claims returns information about the claimed partitions by topic. Claims() map[string][]int32 // MemberID returns the cluster member ID. MemberID() string // GenerationID returns the current generation ID. GenerationID() int32 // MarkOffset marks the provided offset, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // To follow upstream conventions, you are expected to mark the offset of the // next message to read, not the last message read. Thus, when calling `MarkOffset` // you should typically add one to the offset of the last consumed message. // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. MarkOffset(topic string, partition int32, offset int64, metadata string) // Commit the offset to the backend // // Note: calling Commit performs a blocking synchronous operation. Commit() // ResetOffset resets to the provided offset, alongside a metadata string that // represents the state of the partition consumer at that point in time. Reset // acts as a counterpart to MarkOffset, the difference being that it allows to // reset an offset to an earlier or smaller value, where MarkOffset only // allows incrementing the offset. cf MarkOffset for more details. ResetOffset(topic string, partition int32, offset int64, metadata string) // MarkMessage marks a message as consumed. MarkMessage(msg *ConsumerMessage, metadata string) // Context returns the session context. Context() context.Context } type consumerGroupSession struct { parent *consumerGroup memberID string generationID int32 handler ConsumerGroupHandler claims map[string][]int32 offsets *offsetManager ctx context.Context cancel func() waitGroup sync.WaitGroup releaseOnce sync.Once hbDying, hbDead chan none } func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { // init offset manager offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client) if err != nil { return nil, err } // init context ctx, cancel := context.WithCancel(ctx) // init session sess := &consumerGroupSession{ parent: parent, memberID: memberID, generationID: generationID, handler: handler, offsets: offsets, claims: claims, ctx: ctx, cancel: cancel, hbDying: make(chan none), hbDead: make(chan none), } // start heartbeat loop go sess.heartbeatLoop() // create a POM for each claim for topic, partitions := range claims { for _, partition := range partitions { pom, err := offsets.ManagePartition(topic, partition) if err != nil { _ = sess.release(false) return nil, err } // handle POM errors go func(topic string, partition int32) { for err := range pom.Errors() { sess.parent.handleError(err, topic, partition) } }(topic, partition) } } // perform setup if err := handler.Setup(sess); err != nil { _ = sess.release(true) return nil, err } // start consuming for topic, partitions := range claims { for _, partition := range partitions { sess.waitGroup.Add(1) go func(topic string, partition int32) { defer sess.waitGroup.Done() // cancel the as session as soon as the first // goroutine exits defer sess.cancel() // consume a single topic/partition, blocking sess.consume(topic, partition) }(topic, partition) } } return sess, nil } func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims } func (s *consumerGroupSession) MemberID() string { return s.memberID } func (s *consumerGroupSession) GenerationID() int32 { return s.generationID } func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { if pom := s.offsets.findPOM(topic, partition); pom != nil { pom.MarkOffset(offset, metadata) } } func (s *consumerGroupSession) Commit() { s.offsets.Commit() } func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { if pom := s.offsets.findPOM(topic, partition); pom != nil { pom.ResetOffset(offset, metadata) } } func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) { s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata) } func (s *consumerGroupSession) Context() context.Context { return s.ctx } func (s *consumerGroupSession) consume(topic string, partition int32) { // quick exit if rebalance is due select { case <-s.ctx.Done(): return case <-s.parent.closed: return default: } // get next offset offset := s.parent.config.Consumer.Offsets.Initial if pom := s.offsets.findPOM(topic, partition); pom != nil { offset, _ = pom.NextOffset() } // create new claim claim, err := newConsumerGroupClaim(s, topic, partition, offset) if err != nil { s.parent.handleError(err, topic, partition) return } // handle errors go func() { for err := range claim.Errors() { s.parent.handleError(err, topic, partition) } }() // trigger close when session is done go func() { select { case <-s.ctx.Done(): case <-s.parent.closed: } claim.AsyncClose() }() // start processing if err := s.handler.ConsumeClaim(s, claim); err != nil { s.parent.handleError(err, topic, partition) } // ensure consumer is closed & drained claim.AsyncClose() for _, err := range claim.waitClosed() { s.parent.handleError(err, topic, partition) } } func (s *consumerGroupSession) release(withCleanup bool) (err error) { // signal release, stop heartbeat s.cancel() // wait for consumers to exit s.waitGroup.Wait() // perform release s.releaseOnce.Do(func() { if withCleanup { if e := s.handler.Cleanup(s); e != nil { s.parent.handleError(e, "", -1) err = e } } if e := s.offsets.Close(); e != nil { err = e } close(s.hbDying) <-s.hbDead }) return } func (s *consumerGroupSession) heartbeatLoop() { defer close(s.hbDead) defer s.cancel() // trigger the end of the session on exit pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval) defer pause.Stop() retries := s.parent.config.Metadata.Retry.Max for { coordinator, err := s.parent.client.Coordinator(s.parent.groupID) if err != nil { if retries <= 0 { s.parent.handleError(err, "", -1) return } select { case <-s.hbDying: return case <-time.After(s.parent.config.Metadata.Retry.Backoff): retries-- } continue } resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID) if err != nil { _ = coordinator.Close() if retries <= 0 { s.parent.handleError(err, "", -1) return } retries-- continue } switch resp.Err { case ErrNoError: retries = s.parent.config.Metadata.Retry.Max case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration: return default: s.parent.handleError(resp.Err, "", -1) return } select { case <-pause.C: case <-s.hbDying: return } } } // -------------------------------------------------------------------- // ConsumerGroupHandler instances are used to handle individual topic/partition claims. // It also provides hooks for your consumer group session life-cycle and allow you to // trigger logic before or after the consume loop(s). // // PLEASE NOTE that handlers are likely be called from several goroutines concurrently, // ensure that all state is safely protected against race conditions. type ConsumerGroupHandler interface { // Setup is run at the beginning of a new session, before ConsumeClaim. Setup(ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. Cleanup(ConsumerGroupSession) error // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error } // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group. type ConsumerGroupClaim interface { // Topic returns the consumed topic name. Topic() string // Partition returns the consumed partition. Partition() int32 // InitialOffset returns the initial offset that was used as a starting point for this claim. InitialOffset() int64 // HighWaterMarkOffset returns the high water mark offset of the partition, // i.e. the offset that will be used for the next message that will be produced. // You can use this to determine how far behind the processing is. HighWaterMarkOffset() int64 // Messages returns the read channel for the messages that are returned by // the broker. The messages channel will be closed when a new rebalance cycle // is due. You must finish processing and mark offsets within // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually // re-assigned to another group member. Messages() <-chan *ConsumerMessage } type consumerGroupClaim struct { topic string partition int32 offset int64 PartitionConsumer } func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) if err == ErrOffsetOutOfRange { offset = sess.parent.config.Consumer.Offsets.Initial pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) } if err != nil { return nil, err } go func() { for err := range pcm.Errors() { sess.parent.handleError(err, topic, partition) } }() return &consumerGroupClaim{ topic: topic, partition: partition, offset: offset, PartitionConsumer: pcm, }, nil } func (c *consumerGroupClaim) Topic() string { return c.topic } func (c *consumerGroupClaim) Partition() int32 { return c.partition } func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset } // Drains messages and errors, ensures the claim is fully closed. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) { go func() { for range c.Messages() { } }() for err := range c.Errors() { errs = append(errs, err) } return }