consumer_group.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876
  1. package sarama
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
  11. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
  12. // ConsumerGroup is responsible for dividing up processing of topics and partitions
  13. // over a collection of processes (the members of the consumer group).
  14. type ConsumerGroup interface {
  15. // Consume joins a cluster of consumers for a given list of topics and
  16. // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
  17. //
  18. // The life-cycle of a session is represented by the following steps:
  19. //
  20. // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
  21. // and is assigned their "fair share" of partitions, aka 'claims'.
  22. // 2. Before processing starts, the handler's Setup() hook is called to notify the user
  23. // of the claims and allow any necessary preparation or alteration of state.
  24. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
  25. // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
  26. // from concurrent reads/writes.
  27. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
  28. // parent context is cancelled or when a server-side rebalance cycle is initiated.
  29. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
  30. // to allow the user to perform any final tasks before a rebalance.
  31. // 6. Finally, marked offsets are committed one last time before claims are released.
  32. //
  33. // Please note, that once a rebalance is triggered, sessions must be completed within
  34. // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
  35. // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
  36. // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
  37. // commit failures.
  38. // This method should be called inside an infinite loop, when a
  39. // server-side rebalance happens, the consumer session will need to be
  40. // recreated to get the new claims.
  41. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  42. // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  43. // By default, errors are logged and not returned over this channel.
  44. // If you want to implement any custom error handling, set your config's
  45. // Consumer.Return.Errors setting to true, and read from this channel.
  46. Errors() <-chan error
  47. // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  48. // this function before the object passes out of scope, as it will otherwise leak memory.
  49. Close() error
  50. }
  51. type consumerGroup struct {
  52. client Client
  53. config *Config
  54. consumer Consumer
  55. groupID string
  56. memberID string
  57. errors chan error
  58. lock sync.Mutex
  59. closed chan none
  60. closeOnce sync.Once
  61. userData []byte
  62. }
  63. // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
  64. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
  65. client, err := NewClient(addrs, config)
  66. if err != nil {
  67. return nil, err
  68. }
  69. c, err := newConsumerGroup(groupID, client)
  70. if err != nil {
  71. _ = client.Close()
  72. }
  73. return c, err
  74. }
  75. // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
  76. // necessary to call Close() on the underlying client when shutting down this consumer.
  77. // PLEASE NOTE: consumer groups can only re-use but not share clients.
  78. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
  79. // For clients passed in by the client, ensure we don't
  80. // call Close() on it.
  81. cli := &nopCloserClient{client}
  82. return newConsumerGroup(groupID, cli)
  83. }
  84. func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  85. config := client.Config()
  86. if !config.Version.IsAtLeast(V0_10_2_0) {
  87. return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
  88. }
  89. consumer, err := NewConsumerFromClient(client)
  90. if err != nil {
  91. return nil, err
  92. }
  93. return &consumerGroup{
  94. client: client,
  95. consumer: consumer,
  96. config: config,
  97. groupID: groupID,
  98. errors: make(chan error, config.ChannelBufferSize),
  99. closed: make(chan none),
  100. }, nil
  101. }
  102. // Errors implements ConsumerGroup.
  103. func (c *consumerGroup) Errors() <-chan error { return c.errors }
  104. // Close implements ConsumerGroup.
  105. func (c *consumerGroup) Close() (err error) {
  106. c.closeOnce.Do(func() {
  107. close(c.closed)
  108. // leave group
  109. if e := c.leave(); e != nil {
  110. err = e
  111. }
  112. // drain errors
  113. go func() {
  114. close(c.errors)
  115. }()
  116. for e := range c.errors {
  117. err = e
  118. }
  119. if e := c.client.Close(); e != nil {
  120. err = e
  121. }
  122. })
  123. return
  124. }
  125. // Consume implements ConsumerGroup.
  126. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  127. // Ensure group is not closed
  128. select {
  129. case <-c.closed:
  130. return ErrClosedConsumerGroup
  131. default:
  132. }
  133. c.lock.Lock()
  134. defer c.lock.Unlock()
  135. // Quick exit when no topics are provided
  136. if len(topics) == 0 {
  137. return fmt.Errorf("no topics provided")
  138. }
  139. // Refresh metadata for requested topics
  140. if err := c.client.RefreshMetadata(topics...); err != nil {
  141. return err
  142. }
  143. // Init session
  144. sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
  145. if err == ErrClosedClient {
  146. return ErrClosedConsumerGroup
  147. } else if err != nil {
  148. return err
  149. }
  150. // loop check topic partition numbers changed
  151. // will trigger rebalance when any topic partitions number had changed
  152. // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
  153. go c.loopCheckPartitionNumbers(topics, sess)
  154. // Wait for session exit signal
  155. <-sess.ctx.Done()
  156. // Gracefully release session claims
  157. return sess.release(true)
  158. }
  159. func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
  160. select {
  161. case <-c.closed:
  162. return nil, ErrClosedConsumerGroup
  163. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  164. }
  165. if refreshCoordinator {
  166. err := c.client.RefreshCoordinator(c.groupID)
  167. if err != nil {
  168. return c.retryNewSession(ctx, topics, handler, retries, true)
  169. }
  170. }
  171. return c.newSession(ctx, topics, handler, retries-1)
  172. }
  173. func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  174. coordinator, err := c.client.Coordinator(c.groupID)
  175. if err != nil {
  176. if retries <= 0 {
  177. return nil, err
  178. }
  179. return c.retryNewSession(ctx, topics, handler, retries, true)
  180. }
  181. // Join consumer group
  182. join, err := c.joinGroupRequest(coordinator, topics)
  183. if err != nil {
  184. _ = coordinator.Close()
  185. return nil, err
  186. }
  187. switch join.Err {
  188. case ErrNoError:
  189. c.memberID = join.MemberId
  190. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  191. c.memberID = ""
  192. return c.newSession(ctx, topics, handler, retries)
  193. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  194. if retries <= 0 {
  195. return nil, join.Err
  196. }
  197. return c.retryNewSession(ctx, topics, handler, retries, true)
  198. case ErrRebalanceInProgress: // retry after backoff
  199. if retries <= 0 {
  200. return nil, join.Err
  201. }
  202. return c.retryNewSession(ctx, topics, handler, retries, false)
  203. default:
  204. return nil, join.Err
  205. }
  206. // Prepare distribution plan if we joined as the leader
  207. var plan BalanceStrategyPlan
  208. if join.LeaderId == join.MemberId {
  209. members, err := join.GetMembers()
  210. if err != nil {
  211. return nil, err
  212. }
  213. plan, err = c.balance(members)
  214. if err != nil {
  215. return nil, err
  216. }
  217. }
  218. // Sync consumer group
  219. groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  220. if err != nil {
  221. _ = coordinator.Close()
  222. return nil, err
  223. }
  224. switch groupRequest.Err {
  225. case ErrNoError:
  226. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  227. c.memberID = ""
  228. return c.newSession(ctx, topics, handler, retries)
  229. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  230. if retries <= 0 {
  231. return nil, groupRequest.Err
  232. }
  233. return c.retryNewSession(ctx, topics, handler, retries, true)
  234. case ErrRebalanceInProgress: // retry after backoff
  235. if retries <= 0 {
  236. return nil, groupRequest.Err
  237. }
  238. return c.retryNewSession(ctx, topics, handler, retries, false)
  239. default:
  240. return nil, groupRequest.Err
  241. }
  242. // Retrieve and sort claims
  243. var claims map[string][]int32
  244. if len(groupRequest.MemberAssignment) > 0 {
  245. members, err := groupRequest.GetMemberAssignment()
  246. if err != nil {
  247. return nil, err
  248. }
  249. claims = members.Topics
  250. c.userData = members.UserData
  251. for _, partitions := range claims {
  252. sort.Sort(int32Slice(partitions))
  253. }
  254. }
  255. return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
  256. }
  257. func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
  258. req := &JoinGroupRequest{
  259. GroupId: c.groupID,
  260. MemberId: c.memberID,
  261. SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
  262. ProtocolType: "consumer",
  263. }
  264. if c.config.Version.IsAtLeast(V0_10_1_0) {
  265. req.Version = 1
  266. req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
  267. }
  268. // use static user-data if configured, otherwise use consumer-group userdata from the last sync
  269. userData := c.config.Consumer.Group.Member.UserData
  270. if len(userData) == 0 {
  271. userData = c.userData
  272. }
  273. meta := &ConsumerGroupMemberMetadata{
  274. Topics: topics,
  275. UserData: userData,
  276. }
  277. strategy := c.config.Consumer.Group.Rebalance.Strategy
  278. if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
  279. return nil, err
  280. }
  281. return coordinator.JoinGroup(req)
  282. }
  283. func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
  284. req := &SyncGroupRequest{
  285. GroupId: c.groupID,
  286. MemberId: c.memberID,
  287. GenerationId: generationID,
  288. }
  289. strategy := c.config.Consumer.Group.Rebalance.Strategy
  290. for memberID, topics := range plan {
  291. assignment := &ConsumerGroupMemberAssignment{Topics: topics}
  292. userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
  293. if err != nil {
  294. return nil, err
  295. }
  296. assignment.UserData = userDataBytes
  297. if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
  298. return nil, err
  299. }
  300. }
  301. return coordinator.SyncGroup(req)
  302. }
  303. func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
  304. req := &HeartbeatRequest{
  305. GroupId: c.groupID,
  306. MemberId: memberID,
  307. GenerationId: generationID,
  308. }
  309. return coordinator.Heartbeat(req)
  310. }
  311. func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  312. topics := make(map[string][]int32)
  313. for _, meta := range members {
  314. for _, topic := range meta.Topics {
  315. topics[topic] = nil
  316. }
  317. }
  318. for topic := range topics {
  319. partitions, err := c.client.Partitions(topic)
  320. if err != nil {
  321. return nil, err
  322. }
  323. topics[topic] = partitions
  324. }
  325. strategy := c.config.Consumer.Group.Rebalance.Strategy
  326. return strategy.Plan(members, topics)
  327. }
  328. // Leaves the cluster, called by Close.
  329. func (c *consumerGroup) leave() error {
  330. c.lock.Lock()
  331. defer c.lock.Unlock()
  332. if c.memberID == "" {
  333. return nil
  334. }
  335. coordinator, err := c.client.Coordinator(c.groupID)
  336. if err != nil {
  337. return err
  338. }
  339. resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
  340. GroupId: c.groupID,
  341. MemberId: c.memberID,
  342. })
  343. if err != nil {
  344. _ = coordinator.Close()
  345. return err
  346. }
  347. // Unset memberID
  348. c.memberID = ""
  349. // Check response
  350. switch resp.Err {
  351. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
  352. return nil
  353. default:
  354. return resp.Err
  355. }
  356. }
  357. func (c *consumerGroup) handleError(err error, topic string, partition int32) {
  358. if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
  359. err = &ConsumerError{
  360. Topic: topic,
  361. Partition: partition,
  362. Err: err,
  363. }
  364. }
  365. if !c.config.Consumer.Return.Errors {
  366. Logger.Println(err)
  367. return
  368. }
  369. select {
  370. case <-c.closed:
  371. //consumer is closed
  372. return
  373. default:
  374. }
  375. select {
  376. case c.errors <- err:
  377. default:
  378. // no error listener
  379. }
  380. }
  381. func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
  382. pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
  383. defer session.cancel()
  384. defer pause.Stop()
  385. var oldTopicToPartitionNum map[string]int
  386. var err error
  387. if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
  388. return
  389. }
  390. for {
  391. if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
  392. return
  393. } else {
  394. for topic, num := range oldTopicToPartitionNum {
  395. if newTopicToPartitionNum[topic] != num {
  396. return // trigger the end of the session on exit
  397. }
  398. }
  399. }
  400. select {
  401. case <-pause.C:
  402. case <-session.ctx.Done():
  403. Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
  404. // if session closed by other, should be exited
  405. return
  406. case <-c.closed:
  407. return
  408. }
  409. }
  410. }
  411. func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
  412. topicToPartitionNum := make(map[string]int, len(topics))
  413. for _, topic := range topics {
  414. if partitionNum, err := c.client.Partitions(topic); err != nil {
  415. Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
  416. return nil, err
  417. } else {
  418. topicToPartitionNum[topic] = len(partitionNum)
  419. }
  420. }
  421. return topicToPartitionNum, nil
  422. }
  423. // --------------------------------------------------------------------
  424. // ConsumerGroupSession represents a consumer group member session.
  425. type ConsumerGroupSession interface {
  426. // Claims returns information about the claimed partitions by topic.
  427. Claims() map[string][]int32
  428. // MemberID returns the cluster member ID.
  429. MemberID() string
  430. // GenerationID returns the current generation ID.
  431. GenerationID() int32
  432. // MarkOffset marks the provided offset, alongside a metadata string
  433. // that represents the state of the partition consumer at that point in time. The
  434. // metadata string can be used by another consumer to restore that state, so it
  435. // can resume consumption.
  436. //
  437. // To follow upstream conventions, you are expected to mark the offset of the
  438. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  439. // you should typically add one to the offset of the last consumed message.
  440. //
  441. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  442. // store immediately for efficiency reasons, and it may never be committed if
  443. // your application crashes. This means that you may end up processing the same
  444. // message twice, and your processing should ideally be idempotent.
  445. MarkOffset(topic string, partition int32, offset int64, metadata string)
  446. // Commit the offset to the backend
  447. //
  448. // Note: calling Commit performs a blocking synchronous operation.
  449. Commit()
  450. // ResetOffset resets to the provided offset, alongside a metadata string that
  451. // represents the state of the partition consumer at that point in time. Reset
  452. // acts as a counterpart to MarkOffset, the difference being that it allows to
  453. // reset an offset to an earlier or smaller value, where MarkOffset only
  454. // allows incrementing the offset. cf MarkOffset for more details.
  455. ResetOffset(topic string, partition int32, offset int64, metadata string)
  456. // MarkMessage marks a message as consumed.
  457. MarkMessage(msg *ConsumerMessage, metadata string)
  458. // Context returns the session context.
  459. Context() context.Context
  460. }
  461. type consumerGroupSession struct {
  462. parent *consumerGroup
  463. memberID string
  464. generationID int32
  465. handler ConsumerGroupHandler
  466. claims map[string][]int32
  467. offsets *offsetManager
  468. ctx context.Context
  469. cancel func()
  470. waitGroup sync.WaitGroup
  471. releaseOnce sync.Once
  472. hbDying, hbDead chan none
  473. }
  474. func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
  475. // init offset manager
  476. offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
  477. if err != nil {
  478. return nil, err
  479. }
  480. // init context
  481. ctx, cancel := context.WithCancel(ctx)
  482. // init session
  483. sess := &consumerGroupSession{
  484. parent: parent,
  485. memberID: memberID,
  486. generationID: generationID,
  487. handler: handler,
  488. offsets: offsets,
  489. claims: claims,
  490. ctx: ctx,
  491. cancel: cancel,
  492. hbDying: make(chan none),
  493. hbDead: make(chan none),
  494. }
  495. // start heartbeat loop
  496. go sess.heartbeatLoop()
  497. // create a POM for each claim
  498. for topic, partitions := range claims {
  499. for _, partition := range partitions {
  500. pom, err := offsets.ManagePartition(topic, partition)
  501. if err != nil {
  502. _ = sess.release(false)
  503. return nil, err
  504. }
  505. // handle POM errors
  506. go func(topic string, partition int32) {
  507. for err := range pom.Errors() {
  508. sess.parent.handleError(err, topic, partition)
  509. }
  510. }(topic, partition)
  511. }
  512. }
  513. // perform setup
  514. if err := handler.Setup(sess); err != nil {
  515. _ = sess.release(true)
  516. return nil, err
  517. }
  518. // start consuming
  519. for topic, partitions := range claims {
  520. for _, partition := range partitions {
  521. sess.waitGroup.Add(1)
  522. go func(topic string, partition int32) {
  523. defer sess.waitGroup.Done()
  524. // cancel the as session as soon as the first
  525. // goroutine exits
  526. defer sess.cancel()
  527. // consume a single topic/partition, blocking
  528. sess.consume(topic, partition)
  529. }(topic, partition)
  530. }
  531. }
  532. return sess, nil
  533. }
  534. func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
  535. func (s *consumerGroupSession) MemberID() string { return s.memberID }
  536. func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
  537. func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
  538. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  539. pom.MarkOffset(offset, metadata)
  540. }
  541. }
  542. func (s *consumerGroupSession) Commit() {
  543. s.offsets.Commit()
  544. }
  545. func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
  546. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  547. pom.ResetOffset(offset, metadata)
  548. }
  549. }
  550. func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
  551. s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
  552. }
  553. func (s *consumerGroupSession) Context() context.Context {
  554. return s.ctx
  555. }
  556. func (s *consumerGroupSession) consume(topic string, partition int32) {
  557. // quick exit if rebalance is due
  558. select {
  559. case <-s.ctx.Done():
  560. return
  561. case <-s.parent.closed:
  562. return
  563. default:
  564. }
  565. // get next offset
  566. offset := s.parent.config.Consumer.Offsets.Initial
  567. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  568. offset, _ = pom.NextOffset()
  569. }
  570. // create new claim
  571. claim, err := newConsumerGroupClaim(s, topic, partition, offset)
  572. if err != nil {
  573. s.parent.handleError(err, topic, partition)
  574. return
  575. }
  576. // handle errors
  577. go func() {
  578. for err := range claim.Errors() {
  579. s.parent.handleError(err, topic, partition)
  580. }
  581. }()
  582. // trigger close when session is done
  583. go func() {
  584. select {
  585. case <-s.ctx.Done():
  586. case <-s.parent.closed:
  587. }
  588. claim.AsyncClose()
  589. }()
  590. // start processing
  591. if err := s.handler.ConsumeClaim(s, claim); err != nil {
  592. s.parent.handleError(err, topic, partition)
  593. }
  594. // ensure consumer is closed & drained
  595. claim.AsyncClose()
  596. for _, err := range claim.waitClosed() {
  597. s.parent.handleError(err, topic, partition)
  598. }
  599. }
  600. func (s *consumerGroupSession) release(withCleanup bool) (err error) {
  601. // signal release, stop heartbeat
  602. s.cancel()
  603. // wait for consumers to exit
  604. s.waitGroup.Wait()
  605. // perform release
  606. s.releaseOnce.Do(func() {
  607. if withCleanup {
  608. if e := s.handler.Cleanup(s); e != nil {
  609. s.parent.handleError(e, "", -1)
  610. err = e
  611. }
  612. }
  613. if e := s.offsets.Close(); e != nil {
  614. err = e
  615. }
  616. close(s.hbDying)
  617. <-s.hbDead
  618. })
  619. return
  620. }
  621. func (s *consumerGroupSession) heartbeatLoop() {
  622. defer close(s.hbDead)
  623. defer s.cancel() // trigger the end of the session on exit
  624. pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
  625. defer pause.Stop()
  626. retries := s.parent.config.Metadata.Retry.Max
  627. for {
  628. coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
  629. if err != nil {
  630. if retries <= 0 {
  631. s.parent.handleError(err, "", -1)
  632. return
  633. }
  634. select {
  635. case <-s.hbDying:
  636. return
  637. case <-time.After(s.parent.config.Metadata.Retry.Backoff):
  638. retries--
  639. }
  640. continue
  641. }
  642. resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
  643. if err != nil {
  644. _ = coordinator.Close()
  645. if retries <= 0 {
  646. s.parent.handleError(err, "", -1)
  647. return
  648. }
  649. retries--
  650. continue
  651. }
  652. switch resp.Err {
  653. case ErrNoError:
  654. retries = s.parent.config.Metadata.Retry.Max
  655. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
  656. return
  657. default:
  658. s.parent.handleError(resp.Err, "", -1)
  659. return
  660. }
  661. select {
  662. case <-pause.C:
  663. case <-s.hbDying:
  664. return
  665. }
  666. }
  667. }
  668. // --------------------------------------------------------------------
  669. // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
  670. // It also provides hooks for your consumer group session life-cycle and allow you to
  671. // trigger logic before or after the consume loop(s).
  672. //
  673. // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
  674. // ensure that all state is safely protected against race conditions.
  675. type ConsumerGroupHandler interface {
  676. // Setup is run at the beginning of a new session, before ConsumeClaim.
  677. Setup(ConsumerGroupSession) error
  678. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  679. // but before the offsets are committed for the very last time.
  680. Cleanup(ConsumerGroupSession) error
  681. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  682. // Once the Messages() channel is closed, the Handler must finish its processing
  683. // loop and exit.
  684. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  685. }
  686. // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
  687. type ConsumerGroupClaim interface {
  688. // Topic returns the consumed topic name.
  689. Topic() string
  690. // Partition returns the consumed partition.
  691. Partition() int32
  692. // InitialOffset returns the initial offset that was used as a starting point for this claim.
  693. InitialOffset() int64
  694. // HighWaterMarkOffset returns the high water mark offset of the partition,
  695. // i.e. the offset that will be used for the next message that will be produced.
  696. // You can use this to determine how far behind the processing is.
  697. HighWaterMarkOffset() int64
  698. // Messages returns the read channel for the messages that are returned by
  699. // the broker. The messages channel will be closed when a new rebalance cycle
  700. // is due. You must finish processing and mark offsets within
  701. // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
  702. // re-assigned to another group member.
  703. Messages() <-chan *ConsumerMessage
  704. }
  705. type consumerGroupClaim struct {
  706. topic string
  707. partition int32
  708. offset int64
  709. PartitionConsumer
  710. }
  711. func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  712. pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
  713. if err == ErrOffsetOutOfRange {
  714. offset = sess.parent.config.Consumer.Offsets.Initial
  715. pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
  716. }
  717. if err != nil {
  718. return nil, err
  719. }
  720. go func() {
  721. for err := range pcm.Errors() {
  722. sess.parent.handleError(err, topic, partition)
  723. }
  724. }()
  725. return &consumerGroupClaim{
  726. topic: topic,
  727. partition: partition,
  728. offset: offset,
  729. PartitionConsumer: pcm,
  730. }, nil
  731. }
  732. func (c *consumerGroupClaim) Topic() string { return c.topic }
  733. func (c *consumerGroupClaim) Partition() int32 { return c.partition }
  734. func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
  735. // Drains messages and errors, ensures the claim is fully closed.
  736. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
  737. go func() {
  738. for range c.Messages() {
  739. }
  740. }()
  741. for err := range c.Errors() {
  742. errs = append(errs, err)
  743. }
  744. return
  745. }