consumer_group.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. package sarama
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
  11. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
  12. // ConsumerGroup is responsible for dividing up processing of topics and partitions
  13. // over a collection of processes (the members of the consumer group).
  14. type ConsumerGroup interface {
  15. // Consume joins a cluster of consumers for a given list of topics and
  16. // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
  17. //
  18. // The life-cycle of a session is represented by the following steps:
  19. //
  20. // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
  21. // and is assigned their "fair share" of partitions, aka 'claims'.
  22. // 2. Before processing starts, the handler's Setup() hook is called to notify the user
  23. // of the claims and allow any necessary preparation or alteration of state.
  24. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
  25. // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
  26. // from concurrent reads/writes.
  27. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
  28. // parent context is cancelled or when a server-side rebalance cycle is initiated.
  29. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
  30. // to allow the user to perform any final tasks before a rebalance.
  31. // 6. Finally, marked offsets are committed one last time before claims are released.
  32. //
  33. // Please note, that once a rebalance is triggered, sessions must be completed within
  34. // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
  35. // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
  36. // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
  37. // commit failures.
  38. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  39. // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  40. // By default, errors are logged and not returned over this channel.
  41. // If you want to implement any custom error handling, set your config's
  42. // Consumer.Return.Errors setting to true, and read from this channel.
  43. Errors() <-chan error
  44. // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  45. // this function before the object passes out of scope, as it will otherwise leak memory.
  46. Close() error
  47. }
  48. type consumerGroup struct {
  49. client Client
  50. config *Config
  51. consumer Consumer
  52. groupID string
  53. memberID string
  54. errors chan error
  55. lock sync.Mutex
  56. closed chan none
  57. closeOnce sync.Once
  58. userData []byte
  59. }
  60. // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
  61. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
  62. client, err := NewClient(addrs, config)
  63. if err != nil {
  64. return nil, err
  65. }
  66. c, err := newConsumerGroup(groupID, client)
  67. if err != nil {
  68. _ = client.Close()
  69. }
  70. return c, err
  71. }
  72. // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
  73. // necessary to call Close() on the underlying client when shutting down this consumer.
  74. // PLEASE NOTE: consumer groups can only re-use but not share clients.
  75. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
  76. // For clients passed in by the client, ensure we don't
  77. // call Close() on it.
  78. cli := &nopCloserClient{client}
  79. return newConsumerGroup(groupID, cli)
  80. }
  81. func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  82. config := client.Config()
  83. if !config.Version.IsAtLeast(V0_10_2_0) {
  84. return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
  85. }
  86. consumer, err := NewConsumerFromClient(client)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return &consumerGroup{
  91. client: client,
  92. consumer: consumer,
  93. config: config,
  94. groupID: groupID,
  95. errors: make(chan error, config.ChannelBufferSize),
  96. closed: make(chan none),
  97. }, nil
  98. }
  99. // Errors implements ConsumerGroup.
  100. func (c *consumerGroup) Errors() <-chan error { return c.errors }
  101. // Close implements ConsumerGroup.
  102. func (c *consumerGroup) Close() (err error) {
  103. c.closeOnce.Do(func() {
  104. close(c.closed)
  105. c.lock.Lock()
  106. defer c.lock.Unlock()
  107. // leave group
  108. if e := c.leave(); e != nil {
  109. err = e
  110. }
  111. // drain errors
  112. go func() {
  113. close(c.errors)
  114. }()
  115. for e := range c.errors {
  116. err = e
  117. }
  118. if e := c.client.Close(); e != nil {
  119. err = e
  120. }
  121. })
  122. return
  123. }
  124. // Consume implements ConsumerGroup.
  125. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  126. // Ensure group is not closed
  127. select {
  128. case <-c.closed:
  129. return ErrClosedConsumerGroup
  130. default:
  131. }
  132. c.lock.Lock()
  133. defer c.lock.Unlock()
  134. // Quick exit when no topics are provided
  135. if len(topics) == 0 {
  136. return fmt.Errorf("no topics provided")
  137. }
  138. // Refresh metadata for requested topics
  139. if err := c.client.RefreshMetadata(topics...); err != nil {
  140. return err
  141. }
  142. // Init session
  143. sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
  144. if err == ErrClosedClient {
  145. return ErrClosedConsumerGroup
  146. } else if err != nil {
  147. return err
  148. }
  149. // loop check topic partition numbers changed
  150. // will trigger rebalance when any topic partitions number had changed
  151. go c.loopCheckPartitionNumbers(topics, sess)
  152. // Wait for session exit signal
  153. <-sess.ctx.Done()
  154. // Gracefully release session claims
  155. return sess.release(true)
  156. }
  157. func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
  158. select {
  159. case <-c.closed:
  160. return nil, ErrClosedConsumerGroup
  161. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  162. }
  163. if refreshCoordinator {
  164. err := c.client.RefreshCoordinator(c.groupID)
  165. if err != nil {
  166. return c.retryNewSession(ctx, topics, handler, retries, true)
  167. }
  168. }
  169. return c.newSession(ctx, topics, handler, retries-1)
  170. }
  171. func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  172. coordinator, err := c.client.Coordinator(c.groupID)
  173. if err != nil {
  174. if retries <= 0 {
  175. return nil, err
  176. }
  177. return c.retryNewSession(ctx, topics, handler, retries, true)
  178. }
  179. // Join consumer group
  180. join, err := c.joinGroupRequest(coordinator, topics)
  181. if err != nil {
  182. _ = coordinator.Close()
  183. return nil, err
  184. }
  185. switch join.Err {
  186. case ErrNoError:
  187. c.memberID = join.MemberId
  188. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  189. c.memberID = ""
  190. return c.newSession(ctx, topics, handler, retries)
  191. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  192. if retries <= 0 {
  193. return nil, join.Err
  194. }
  195. return c.retryNewSession(ctx, topics, handler, retries, true)
  196. case ErrRebalanceInProgress: // retry after backoff
  197. if retries <= 0 {
  198. return nil, join.Err
  199. }
  200. return c.retryNewSession(ctx, topics, handler, retries, false)
  201. default:
  202. return nil, join.Err
  203. }
  204. // Prepare distribution plan if we joined as the leader
  205. var plan BalanceStrategyPlan
  206. if join.LeaderId == join.MemberId {
  207. members, err := join.GetMembers()
  208. if err != nil {
  209. return nil, err
  210. }
  211. plan, err = c.balance(members)
  212. if err != nil {
  213. return nil, err
  214. }
  215. }
  216. // Sync consumer group
  217. sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  218. if err != nil {
  219. _ = coordinator.Close()
  220. return nil, err
  221. }
  222. switch sync.Err {
  223. case ErrNoError:
  224. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  225. c.memberID = ""
  226. return c.newSession(ctx, topics, handler, retries)
  227. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  228. if retries <= 0 {
  229. return nil, sync.Err
  230. }
  231. return c.retryNewSession(ctx, topics, handler, retries, true)
  232. case ErrRebalanceInProgress: // retry after backoff
  233. if retries <= 0 {
  234. return nil, sync.Err
  235. }
  236. return c.retryNewSession(ctx, topics, handler, retries, false)
  237. default:
  238. return nil, sync.Err
  239. }
  240. // Retrieve and sort claims
  241. var claims map[string][]int32
  242. if len(sync.MemberAssignment) > 0 {
  243. members, err := sync.GetMemberAssignment()
  244. if err != nil {
  245. return nil, err
  246. }
  247. claims = members.Topics
  248. c.userData = members.UserData
  249. for _, partitions := range claims {
  250. sort.Sort(int32Slice(partitions))
  251. }
  252. }
  253. return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
  254. }
  255. func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
  256. req := &JoinGroupRequest{
  257. GroupId: c.groupID,
  258. MemberId: c.memberID,
  259. SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
  260. ProtocolType: "consumer",
  261. }
  262. if c.config.Version.IsAtLeast(V0_10_1_0) {
  263. req.Version = 1
  264. req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
  265. }
  266. // use static user-data if configured, otherwise use consumer-group userdata from the last sync
  267. userData := c.config.Consumer.Group.Member.UserData
  268. if len(userData) == 0 {
  269. userData = c.userData
  270. }
  271. meta := &ConsumerGroupMemberMetadata{
  272. Topics: topics,
  273. UserData: userData,
  274. }
  275. strategy := c.config.Consumer.Group.Rebalance.Strategy
  276. if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
  277. return nil, err
  278. }
  279. return coordinator.JoinGroup(req)
  280. }
  281. func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
  282. req := &SyncGroupRequest{
  283. GroupId: c.groupID,
  284. MemberId: c.memberID,
  285. GenerationId: generationID,
  286. }
  287. for memberID, topics := range plan {
  288. assignment := &ConsumerGroupMemberAssignment{Topics: topics}
  289. // Include topic assignments in group-assignment userdata for each consumer-group member
  290. if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
  291. userDataBytes, err := encode(&StickyAssignorUserDataV1{
  292. Topics: topics,
  293. Generation: generationID,
  294. }, nil)
  295. if err != nil {
  296. return nil, err
  297. }
  298. assignment.UserData = userDataBytes
  299. }
  300. if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
  301. return nil, err
  302. }
  303. }
  304. return coordinator.SyncGroup(req)
  305. }
  306. func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
  307. req := &HeartbeatRequest{
  308. GroupId: c.groupID,
  309. MemberId: memberID,
  310. GenerationId: generationID,
  311. }
  312. return coordinator.Heartbeat(req)
  313. }
  314. func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  315. topics := make(map[string][]int32)
  316. for _, meta := range members {
  317. for _, topic := range meta.Topics {
  318. topics[topic] = nil
  319. }
  320. }
  321. for topic := range topics {
  322. partitions, err := c.client.Partitions(topic)
  323. if err != nil {
  324. return nil, err
  325. }
  326. topics[topic] = partitions
  327. }
  328. strategy := c.config.Consumer.Group.Rebalance.Strategy
  329. return strategy.Plan(members, topics)
  330. }
  331. // Leaves the cluster, called by Close, protected by lock.
  332. func (c *consumerGroup) leave() error {
  333. if c.memberID == "" {
  334. return nil
  335. }
  336. coordinator, err := c.client.Coordinator(c.groupID)
  337. if err != nil {
  338. return err
  339. }
  340. resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
  341. GroupId: c.groupID,
  342. MemberId: c.memberID,
  343. })
  344. if err != nil {
  345. _ = coordinator.Close()
  346. return err
  347. }
  348. // Unset memberID
  349. c.memberID = ""
  350. // Check response
  351. switch resp.Err {
  352. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
  353. return nil
  354. default:
  355. return resp.Err
  356. }
  357. }
  358. func (c *consumerGroup) handleError(err error, topic string, partition int32) {
  359. if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
  360. err = &ConsumerError{
  361. Topic: topic,
  362. Partition: partition,
  363. Err: err,
  364. }
  365. }
  366. if !c.config.Consumer.Return.Errors {
  367. Logger.Println(err)
  368. return
  369. }
  370. c.lock.Lock()
  371. defer c.lock.Unlock()
  372. select {
  373. case <-c.closed:
  374. //consumer is closed
  375. return
  376. default:
  377. }
  378. select {
  379. case c.errors <- err:
  380. default:
  381. // no error listener
  382. }
  383. }
  384. func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
  385. pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
  386. defer session.cancel()
  387. defer pause.Stop()
  388. var oldTopicToPartitionNum map[string]int
  389. var err error
  390. if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
  391. return
  392. }
  393. for {
  394. if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
  395. return
  396. } else {
  397. for topic, num := range oldTopicToPartitionNum {
  398. if newTopicToPartitionNum[topic] != num {
  399. return // trigger the end of the session on exit
  400. }
  401. }
  402. }
  403. select {
  404. case <-pause.C:
  405. case <-c.closed:
  406. return
  407. }
  408. }
  409. }
  410. func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
  411. if err := c.client.RefreshMetadata(topics...); err != nil {
  412. Logger.Printf("Consumer Group refresh metadata failed %v", err)
  413. return nil, err
  414. }
  415. topicToPartitionNum := make(map[string]int, len(topics))
  416. for _, topic := range topics {
  417. if partitionNum, err := c.client.Partitions(topic); err != nil {
  418. Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
  419. return nil, err
  420. } else {
  421. topicToPartitionNum[topic] = len(partitionNum)
  422. }
  423. }
  424. return topicToPartitionNum, nil
  425. }
  426. // --------------------------------------------------------------------
  427. // ConsumerGroupSession represents a consumer group member session.
  428. type ConsumerGroupSession interface {
  429. // Claims returns information about the claimed partitions by topic.
  430. Claims() map[string][]int32
  431. // MemberID returns the cluster member ID.
  432. MemberID() string
  433. // GenerationID returns the current generation ID.
  434. GenerationID() int32
  435. // MarkOffset marks the provided offset, alongside a metadata string
  436. // that represents the state of the partition consumer at that point in time. The
  437. // metadata string can be used by another consumer to restore that state, so it
  438. // can resume consumption.
  439. //
  440. // To follow upstream conventions, you are expected to mark the offset of the
  441. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  442. // you should typically add one to the offset of the last consumed message.
  443. //
  444. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  445. // store immediately for efficiency reasons, and it may never be committed if
  446. // your application crashes. This means that you may end up processing the same
  447. // message twice, and your processing should ideally be idempotent.
  448. MarkOffset(topic string, partition int32, offset int64, metadata string)
  449. // ResetOffset resets to the provided offset, alongside a metadata string that
  450. // represents the state of the partition consumer at that point in time. Reset
  451. // acts as a counterpart to MarkOffset, the difference being that it allows to
  452. // reset an offset to an earlier or smaller value, where MarkOffset only
  453. // allows incrementing the offset. cf MarkOffset for more details.
  454. ResetOffset(topic string, partition int32, offset int64, metadata string)
  455. // MarkMessage marks a message as consumed.
  456. MarkMessage(msg *ConsumerMessage, metadata string)
  457. // Context returns the session context.
  458. Context() context.Context
  459. }
  460. type consumerGroupSession struct {
  461. parent *consumerGroup
  462. memberID string
  463. generationID int32
  464. handler ConsumerGroupHandler
  465. claims map[string][]int32
  466. offsets *offsetManager
  467. ctx context.Context
  468. cancel func()
  469. waitGroup sync.WaitGroup
  470. releaseOnce sync.Once
  471. hbDying, hbDead chan none
  472. }
  473. func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
  474. // init offset manager
  475. offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
  476. if err != nil {
  477. return nil, err
  478. }
  479. // init context
  480. ctx, cancel := context.WithCancel(ctx)
  481. // init session
  482. sess := &consumerGroupSession{
  483. parent: parent,
  484. memberID: memberID,
  485. generationID: generationID,
  486. handler: handler,
  487. offsets: offsets,
  488. claims: claims,
  489. ctx: ctx,
  490. cancel: cancel,
  491. hbDying: make(chan none),
  492. hbDead: make(chan none),
  493. }
  494. // start heartbeat loop
  495. go sess.heartbeatLoop()
  496. // create a POM for each claim
  497. for topic, partitions := range claims {
  498. for _, partition := range partitions {
  499. pom, err := offsets.ManagePartition(topic, partition)
  500. if err != nil {
  501. _ = sess.release(false)
  502. return nil, err
  503. }
  504. // handle POM errors
  505. go func(topic string, partition int32) {
  506. for err := range pom.Errors() {
  507. sess.parent.handleError(err, topic, partition)
  508. }
  509. }(topic, partition)
  510. }
  511. }
  512. // perform setup
  513. if err := handler.Setup(sess); err != nil {
  514. _ = sess.release(true)
  515. return nil, err
  516. }
  517. // start consuming
  518. for topic, partitions := range claims {
  519. for _, partition := range partitions {
  520. sess.waitGroup.Add(1)
  521. go func(topic string, partition int32) {
  522. defer sess.waitGroup.Done()
  523. // cancel the as session as soon as the first
  524. // goroutine exits
  525. defer sess.cancel()
  526. // consume a single topic/partition, blocking
  527. sess.consume(topic, partition)
  528. }(topic, partition)
  529. }
  530. }
  531. return sess, nil
  532. }
  533. func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
  534. func (s *consumerGroupSession) MemberID() string { return s.memberID }
  535. func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
  536. func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
  537. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  538. pom.MarkOffset(offset, metadata)
  539. }
  540. }
  541. func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
  542. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  543. pom.ResetOffset(offset, metadata)
  544. }
  545. }
  546. func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
  547. s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
  548. }
  549. func (s *consumerGroupSession) Context() context.Context {
  550. return s.ctx
  551. }
  552. func (s *consumerGroupSession) consume(topic string, partition int32) {
  553. // quick exit if rebalance is due
  554. select {
  555. case <-s.ctx.Done():
  556. return
  557. case <-s.parent.closed:
  558. return
  559. default:
  560. }
  561. // get next offset
  562. offset := s.parent.config.Consumer.Offsets.Initial
  563. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  564. offset, _ = pom.NextOffset()
  565. }
  566. // create new claim
  567. claim, err := newConsumerGroupClaim(s, topic, partition, offset)
  568. if err != nil {
  569. s.parent.handleError(err, topic, partition)
  570. return
  571. }
  572. // handle errors
  573. go func() {
  574. for err := range claim.Errors() {
  575. s.parent.handleError(err, topic, partition)
  576. }
  577. }()
  578. // trigger close when session is done
  579. go func() {
  580. select {
  581. case <-s.ctx.Done():
  582. case <-s.parent.closed:
  583. }
  584. claim.AsyncClose()
  585. }()
  586. // start processing
  587. if err := s.handler.ConsumeClaim(s, claim); err != nil {
  588. s.parent.handleError(err, topic, partition)
  589. }
  590. // ensure consumer is closed & drained
  591. claim.AsyncClose()
  592. for _, err := range claim.waitClosed() {
  593. s.parent.handleError(err, topic, partition)
  594. }
  595. }
  596. func (s *consumerGroupSession) release(withCleanup bool) (err error) {
  597. // signal release, stop heartbeat
  598. s.cancel()
  599. // wait for consumers to exit
  600. s.waitGroup.Wait()
  601. // perform release
  602. s.releaseOnce.Do(func() {
  603. if withCleanup {
  604. if e := s.handler.Cleanup(s); e != nil {
  605. s.parent.handleError(e, "", -1)
  606. err = e
  607. }
  608. }
  609. if e := s.offsets.Close(); e != nil {
  610. err = e
  611. }
  612. close(s.hbDying)
  613. <-s.hbDead
  614. })
  615. return
  616. }
  617. func (s *consumerGroupSession) heartbeatLoop() {
  618. defer close(s.hbDead)
  619. defer s.cancel() // trigger the end of the session on exit
  620. pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
  621. defer pause.Stop()
  622. retries := s.parent.config.Metadata.Retry.Max
  623. for {
  624. coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
  625. if err != nil {
  626. if retries <= 0 {
  627. s.parent.handleError(err, "", -1)
  628. return
  629. }
  630. select {
  631. case <-s.hbDying:
  632. return
  633. case <-time.After(s.parent.config.Metadata.Retry.Backoff):
  634. retries--
  635. }
  636. continue
  637. }
  638. resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
  639. if err != nil {
  640. _ = coordinator.Close()
  641. if retries <= 0 {
  642. s.parent.handleError(err, "", -1)
  643. return
  644. }
  645. retries--
  646. continue
  647. }
  648. switch resp.Err {
  649. case ErrNoError:
  650. retries = s.parent.config.Metadata.Retry.Max
  651. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
  652. return
  653. default:
  654. s.parent.handleError(err, "", -1)
  655. return
  656. }
  657. select {
  658. case <-pause.C:
  659. case <-s.hbDying:
  660. return
  661. }
  662. }
  663. }
  664. // --------------------------------------------------------------------
  665. // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
  666. // It also provides hooks for your consumer group session life-cycle and allow you to
  667. // trigger logic before or after the consume loop(s).
  668. //
  669. // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
  670. // ensure that all state is safely protected against race conditions.
  671. type ConsumerGroupHandler interface {
  672. // Setup is run at the beginning of a new session, before ConsumeClaim.
  673. Setup(ConsumerGroupSession) error
  674. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  675. // but before the offsets are committed for the very last time.
  676. Cleanup(ConsumerGroupSession) error
  677. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  678. // Once the Messages() channel is closed, the Handler must finish its processing
  679. // loop and exit.
  680. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  681. }
  682. // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
  683. type ConsumerGroupClaim interface {
  684. // Topic returns the consumed topic name.
  685. Topic() string
  686. // Partition returns the consumed partition.
  687. Partition() int32
  688. // InitialOffset returns the initial offset that was used as a starting point for this claim.
  689. InitialOffset() int64
  690. // HighWaterMarkOffset returns the high water mark offset of the partition,
  691. // i.e. the offset that will be used for the next message that will be produced.
  692. // You can use this to determine how far behind the processing is.
  693. HighWaterMarkOffset() int64
  694. // Messages returns the read channel for the messages that are returned by
  695. // the broker. The messages channel will be closed when a new rebalance cycle
  696. // is due. You must finish processing and mark offsets within
  697. // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
  698. // re-assigned to another group member.
  699. Messages() <-chan *ConsumerMessage
  700. }
  701. type consumerGroupClaim struct {
  702. topic string
  703. partition int32
  704. offset int64
  705. PartitionConsumer
  706. }
  707. func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  708. pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
  709. if err == ErrOffsetOutOfRange {
  710. offset = sess.parent.config.Consumer.Offsets.Initial
  711. pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
  712. }
  713. if err != nil {
  714. return nil, err
  715. }
  716. go func() {
  717. for err := range pcm.Errors() {
  718. sess.parent.handleError(err, topic, partition)
  719. }
  720. }()
  721. return &consumerGroupClaim{
  722. topic: topic,
  723. partition: partition,
  724. offset: offset,
  725. PartitionConsumer: pcm,
  726. }, nil
  727. }
  728. func (c *consumerGroupClaim) Topic() string { return c.topic }
  729. func (c *consumerGroupClaim) Partition() int32 { return c.partition }
  730. func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
  731. // Drains messages and errors, ensures the claim is fully closed.
  732. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
  733. go func() {
  734. for range c.Messages() {
  735. }
  736. }()
  737. for err := range c.Errors() {
  738. errs = append(errs, err)
  739. }
  740. return
  741. }