consumer_group.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. package sarama
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
  11. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
  12. // ConsumerGroup is responsible for dividing up processing of topics and partitions
  13. // over a collection of processes (the members of the consumer group).
  14. type ConsumerGroup interface {
  15. // Consume joins a cluster of consumers for a given list of topics and
  16. // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
  17. //
  18. // The life-cycle of a session is represented by the following steps:
  19. //
  20. // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
  21. // and is assigned their "fair share" of partitions, aka 'claims'.
  22. // 2. Before processing starts, the handler's Setup() hook is called to notify the user
  23. // of the claims and allow any necessary preparation or alteration of state.
  24. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
  25. // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
  26. // from concurrent reads/writes.
  27. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
  28. // parent context is cancelled or when a server-side rebalance cycle is initiated.
  29. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
  30. // to allow the user to perform any final tasks before a rebalance.
  31. // 6. Finally, marked offsets are committed one last time before claims are released.
  32. //
  33. // Please note, that once a rebalance is triggered, sessions must be completed within
  34. // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
  35. // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
  36. // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
  37. // commit failures.
  38. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  39. // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  40. // By default, errors are logged and not returned over this channel.
  41. // If you want to implement any custom error handling, set your config's
  42. // Consumer.Return.Errors setting to true, and read from this channel.
  43. Errors() <-chan error
  44. // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  45. // this function before the object passes out of scope, as it will otherwise leak memory.
  46. Close() error
  47. }
  48. type consumerGroup struct {
  49. client Client
  50. config *Config
  51. consumer Consumer
  52. groupID string
  53. memberID string
  54. errors chan error
  55. lock sync.Mutex
  56. closed chan none
  57. closeOnce sync.Once
  58. }
  59. // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
  60. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
  61. client, err := NewClient(addrs, config)
  62. if err != nil {
  63. return nil, err
  64. }
  65. c, err := newConsumerGroup(groupID, client)
  66. if err != nil {
  67. _ = client.Close()
  68. }
  69. return c, err
  70. }
  71. // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
  72. // necessary to call Close() on the underlying client when shutting down this consumer.
  73. // PLEASE NOTE: consumer groups can only re-use but not share clients.
  74. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
  75. // For clients passed in by the client, ensure we don't
  76. // call Close() on it.
  77. cli := &nopCloserClient{client}
  78. return newConsumerGroup(groupID, cli)
  79. }
  80. func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  81. config := client.Config()
  82. if !config.Version.IsAtLeast(V0_10_2_0) {
  83. return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
  84. }
  85. consumer, err := NewConsumerFromClient(client)
  86. if err != nil {
  87. return nil, err
  88. }
  89. return &consumerGroup{
  90. client: client,
  91. consumer: consumer,
  92. config: config,
  93. groupID: groupID,
  94. errors: make(chan error, config.ChannelBufferSize),
  95. closed: make(chan none),
  96. }, nil
  97. }
  98. // Errors implements ConsumerGroup.
  99. func (c *consumerGroup) Errors() <-chan error { return c.errors }
  100. // Close implements ConsumerGroup.
  101. func (c *consumerGroup) Close() (err error) {
  102. c.closeOnce.Do(func() {
  103. close(c.closed)
  104. c.lock.Lock()
  105. defer c.lock.Unlock()
  106. // leave group
  107. if e := c.leave(); e != nil {
  108. err = e
  109. }
  110. // drain errors
  111. go func() {
  112. close(c.errors)
  113. }()
  114. for e := range c.errors {
  115. err = e
  116. }
  117. if e := c.client.Close(); e != nil {
  118. err = e
  119. }
  120. })
  121. return
  122. }
  123. // Consume implements ConsumerGroup.
  124. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  125. // Ensure group is not closed
  126. select {
  127. case <-c.closed:
  128. return ErrClosedConsumerGroup
  129. default:
  130. }
  131. c.lock.Lock()
  132. defer c.lock.Unlock()
  133. // Quick exit when no topics are provided
  134. if len(topics) == 0 {
  135. return fmt.Errorf("no topics provided")
  136. }
  137. // Refresh metadata for requested topics
  138. if err := c.client.RefreshMetadata(topics...); err != nil {
  139. return err
  140. }
  141. // Init session
  142. sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
  143. if err == ErrClosedClient {
  144. return ErrClosedConsumerGroup
  145. } else if err != nil {
  146. return err
  147. }
  148. // Wait for session exit signal
  149. <-sess.ctx.Done()
  150. // Gracefully release session claims
  151. return sess.release(true)
  152. }
  153. func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
  154. select {
  155. case <-c.closed:
  156. return nil, ErrClosedConsumerGroup
  157. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  158. }
  159. if refreshCoordinator {
  160. err := c.client.RefreshCoordinator(c.groupID)
  161. if err != nil {
  162. return c.retryNewSession(ctx, topics, handler, retries, true)
  163. }
  164. }
  165. return c.newSession(ctx, topics, handler, retries-1)
  166. }
  167. func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  168. coordinator, err := c.client.Coordinator(c.groupID)
  169. if err != nil {
  170. if retries <= 0 {
  171. return nil, err
  172. }
  173. return c.retryNewSession(ctx, topics, handler, retries, true)
  174. }
  175. // Join consumer group
  176. join, err := c.joinGroupRequest(coordinator, topics)
  177. if err != nil {
  178. _ = coordinator.Close()
  179. return nil, err
  180. }
  181. switch join.Err {
  182. case ErrNoError:
  183. c.memberID = join.MemberID
  184. case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
  185. c.memberID = ""
  186. return c.newSession(ctx, topics, handler, retries)
  187. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  188. if retries <= 0 {
  189. return nil, join.Err
  190. }
  191. return c.retryNewSession(ctx, topics, handler, retries, true)
  192. case ErrRebalanceInProgress: // retry after backoff
  193. if retries <= 0 {
  194. return nil, join.Err
  195. }
  196. return c.retryNewSession(ctx, topics, handler, retries, false)
  197. default:
  198. return nil, join.Err
  199. }
  200. // Prepare distribution plan if we joined as the leader
  201. var plan BalanceStrategyPlan
  202. if join.LeaderID == join.MemberID {
  203. members, err := join.GetMembers()
  204. if err != nil {
  205. return nil, err
  206. }
  207. plan, err = c.balance(members)
  208. if err != nil {
  209. return nil, err
  210. }
  211. }
  212. // Sync consumer group
  213. sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationID)
  214. if err != nil {
  215. _ = coordinator.Close()
  216. return nil, err
  217. }
  218. switch sync.Err {
  219. case ErrNoError:
  220. case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
  221. c.memberID = ""
  222. return c.newSession(ctx, topics, handler, retries)
  223. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  224. if retries <= 0 {
  225. return nil, sync.Err
  226. }
  227. return c.retryNewSession(ctx, topics, handler, retries, true)
  228. case ErrRebalanceInProgress: // retry after backoff
  229. if retries <= 0 {
  230. return nil, sync.Err
  231. }
  232. return c.retryNewSession(ctx, topics, handler, retries, false)
  233. default:
  234. return nil, sync.Err
  235. }
  236. // Retrieve and sort claims
  237. var claims map[string][]int32
  238. if len(sync.MemberAssignment) > 0 {
  239. members, err := sync.GetMemberAssignment()
  240. if err != nil {
  241. return nil, err
  242. }
  243. claims = members.Topics
  244. for _, partitions := range claims {
  245. sort.Sort(int32Slice(partitions))
  246. }
  247. }
  248. return newConsumerGroupSession(ctx, c, claims, join.MemberID, join.GenerationID, handler)
  249. }
  250. func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
  251. req := &JoinGroupRequest{
  252. GroupID: c.groupID,
  253. MemberID: c.memberID,
  254. SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
  255. ProtocolType: "consumer",
  256. }
  257. if c.config.Version.IsAtLeast(V0_10_1_0) {
  258. req.Version = 1
  259. req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
  260. }
  261. meta := &ConsumerGroupMemberMetadata{
  262. Topics: topics,
  263. UserData: c.config.Consumer.Group.Member.UserData,
  264. }
  265. strategy := c.config.Consumer.Group.Rebalance.Strategy
  266. if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
  267. return nil, err
  268. }
  269. return coordinator.JoinGroup(req)
  270. }
  271. func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
  272. req := &SyncGroupRequest{
  273. GroupID: c.groupID,
  274. MemberID: c.memberID,
  275. GenerationID: generationID,
  276. }
  277. for memberID, topics := range plan {
  278. err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
  279. Topics: topics,
  280. })
  281. if err != nil {
  282. return nil, err
  283. }
  284. }
  285. return coordinator.SyncGroup(req)
  286. }
  287. func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
  288. req := &HeartbeatRequest{
  289. GroupID: c.groupID,
  290. MemberID: memberID,
  291. GenerationID: generationID,
  292. }
  293. return coordinator.Heartbeat(req)
  294. }
  295. func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  296. topics := make(map[string][]int32)
  297. for _, meta := range members {
  298. for _, topic := range meta.Topics {
  299. topics[topic] = nil
  300. }
  301. }
  302. for topic := range topics {
  303. partitions, err := c.client.Partitions(topic)
  304. if err != nil {
  305. return nil, err
  306. }
  307. topics[topic] = partitions
  308. }
  309. strategy := c.config.Consumer.Group.Rebalance.Strategy
  310. return strategy.Plan(members, topics)
  311. }
  312. // Leaves the cluster, called by Close, protected by lock.
  313. func (c *consumerGroup) leave() error {
  314. if c.memberID == "" {
  315. return nil
  316. }
  317. coordinator, err := c.client.Coordinator(c.groupID)
  318. if err != nil {
  319. return err
  320. }
  321. resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
  322. GroupID: c.groupID,
  323. MemberID: c.memberID,
  324. })
  325. if err != nil {
  326. _ = coordinator.Close()
  327. return err
  328. }
  329. // Unset memberID
  330. c.memberID = ""
  331. // Check response
  332. switch resp.Err {
  333. case ErrRebalanceInProgress, ErrUnknownMemberID, ErrNoError:
  334. return nil
  335. default:
  336. return resp.Err
  337. }
  338. }
  339. func (c *consumerGroup) handleError(err error, topic string, partition int32) {
  340. select {
  341. case <-c.closed:
  342. return
  343. default:
  344. }
  345. if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
  346. err = &ConsumerError{
  347. Topic: topic,
  348. Partition: partition,
  349. Err: err,
  350. }
  351. }
  352. if c.config.Consumer.Return.Errors {
  353. select {
  354. case c.errors <- err:
  355. default:
  356. }
  357. } else {
  358. Logger.Println(err)
  359. }
  360. }
  361. // --------------------------------------------------------------------
  362. // ConsumerGroupSession represents a consumer group member session.
  363. type ConsumerGroupSession interface {
  364. // Claims returns information about the claimed partitions by topic.
  365. Claims() map[string][]int32
  366. // MemberID returns the cluster member ID.
  367. MemberID() string
  368. // GenerationID returns the current generation ID.
  369. GenerationID() int32
  370. // MarkOffset marks the provided offset, alongside a metadata string
  371. // that represents the state of the partition consumer at that point in time. The
  372. // metadata string can be used by another consumer to restore that state, so it
  373. // can resume consumption.
  374. //
  375. // To follow upstream conventions, you are expected to mark the offset of the
  376. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  377. // you should typically add one to the offset of the last consumed message.
  378. //
  379. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  380. // store immediately for efficiency reasons, and it may never be committed if
  381. // your application crashes. This means that you may end up processing the same
  382. // message twice, and your processing should ideally be idempotent.
  383. MarkOffset(topic string, partition int32, offset int64, metadata string)
  384. // ResetOffset resets to the provided offset, alongside a metadata string that
  385. // represents the state of the partition consumer at that point in time. Reset
  386. // acts as a counterpart to MarkOffset, the difference being that it allows to
  387. // reset an offset to an earlier or smaller value, where MarkOffset only
  388. // allows incrementing the offset. cf MarkOffset for more details.
  389. ResetOffset(topic string, partition int32, offset int64, metadata string)
  390. // MarkMessage marks a message as consumed.
  391. MarkMessage(msg *ConsumerMessage, metadata string)
  392. // Context returns the session context.
  393. Context() context.Context
  394. }
  395. type consumerGroupSession struct {
  396. parent *consumerGroup
  397. memberID string
  398. generationID int32
  399. handler ConsumerGroupHandler
  400. claims map[string][]int32
  401. offsets *offsetManager
  402. ctx context.Context
  403. cancel func()
  404. waitGroup sync.WaitGroup
  405. releaseOnce sync.Once
  406. hbDying, hbDead chan none
  407. }
  408. func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
  409. // init offset manager
  410. offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
  411. if err != nil {
  412. return nil, err
  413. }
  414. // init context
  415. ctx, cancel := context.WithCancel(ctx)
  416. // init session
  417. sess := &consumerGroupSession{
  418. parent: parent,
  419. memberID: memberID,
  420. generationID: generationID,
  421. handler: handler,
  422. offsets: offsets,
  423. claims: claims,
  424. ctx: ctx,
  425. cancel: cancel,
  426. hbDying: make(chan none),
  427. hbDead: make(chan none),
  428. }
  429. // start heartbeat loop
  430. go sess.heartbeatLoop()
  431. // create a POM for each claim
  432. for topic, partitions := range claims {
  433. for _, partition := range partitions {
  434. pom, err := offsets.ManagePartition(topic, partition)
  435. if err != nil {
  436. _ = sess.release(false)
  437. return nil, err
  438. }
  439. // handle POM errors
  440. go func(topic string, partition int32) {
  441. for err := range pom.Errors() {
  442. sess.parent.handleError(err, topic, partition)
  443. }
  444. }(topic, partition)
  445. }
  446. }
  447. // perform setup
  448. if err := handler.Setup(sess); err != nil {
  449. _ = sess.release(true)
  450. return nil, err
  451. }
  452. // start consuming
  453. for topic, partitions := range claims {
  454. for _, partition := range partitions {
  455. sess.waitGroup.Add(1)
  456. go func(topic string, partition int32) {
  457. defer sess.waitGroup.Done()
  458. // cancel the as session as soon as the first
  459. // goroutine exits
  460. defer sess.cancel()
  461. // consume a single topic/partition, blocking
  462. sess.consume(topic, partition)
  463. }(topic, partition)
  464. }
  465. }
  466. return sess, nil
  467. }
  468. func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
  469. func (s *consumerGroupSession) MemberID() string { return s.memberID }
  470. func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
  471. func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
  472. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  473. pom.MarkOffset(offset, metadata)
  474. }
  475. }
  476. func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
  477. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  478. pom.ResetOffset(offset, metadata)
  479. }
  480. }
  481. func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
  482. s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
  483. }
  484. func (s *consumerGroupSession) Context() context.Context {
  485. return s.ctx
  486. }
  487. func (s *consumerGroupSession) consume(topic string, partition int32) {
  488. // quick exit if rebalance is due
  489. select {
  490. case <-s.ctx.Done():
  491. return
  492. case <-s.parent.closed:
  493. return
  494. default:
  495. }
  496. // get next offset
  497. offset := s.parent.config.Consumer.Offsets.Initial
  498. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  499. offset, _ = pom.NextOffset()
  500. }
  501. // create new claim
  502. claim, err := newConsumerGroupClaim(s, topic, partition, offset)
  503. if err != nil {
  504. s.parent.handleError(err, topic, partition)
  505. return
  506. }
  507. // handle errors
  508. go func() {
  509. for err := range claim.Errors() {
  510. s.parent.handleError(err, topic, partition)
  511. }
  512. }()
  513. // trigger close when session is done
  514. go func() {
  515. select {
  516. case <-s.ctx.Done():
  517. case <-s.parent.closed:
  518. }
  519. claim.AsyncClose()
  520. }()
  521. // start processing
  522. if err := s.handler.ConsumeClaim(s, claim); err != nil {
  523. s.parent.handleError(err, topic, partition)
  524. }
  525. // ensure consumer is closed & drained
  526. claim.AsyncClose()
  527. for _, err := range claim.waitClosed() {
  528. s.parent.handleError(err, topic, partition)
  529. }
  530. }
  531. func (s *consumerGroupSession) release(withCleanup bool) (err error) {
  532. // signal release, stop heartbeat
  533. s.cancel()
  534. // wait for consumers to exit
  535. s.waitGroup.Wait()
  536. // perform release
  537. s.releaseOnce.Do(func() {
  538. if withCleanup {
  539. if e := s.handler.Cleanup(s); e != nil {
  540. s.parent.handleError(e, "", -1)
  541. err = e
  542. }
  543. }
  544. if e := s.offsets.Close(); e != nil {
  545. err = e
  546. }
  547. close(s.hbDying)
  548. <-s.hbDead
  549. })
  550. return
  551. }
  552. func (s *consumerGroupSession) heartbeatLoop() {
  553. defer close(s.hbDead)
  554. defer s.cancel() // trigger the end of the session on exit
  555. pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
  556. defer pause.Stop()
  557. retries := s.parent.config.Metadata.Retry.Max
  558. for {
  559. coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
  560. if err != nil {
  561. if retries <= 0 {
  562. s.parent.handleError(err, "", -1)
  563. return
  564. }
  565. select {
  566. case <-s.hbDying:
  567. return
  568. case <-time.After(s.parent.config.Metadata.Retry.Backoff):
  569. retries--
  570. }
  571. continue
  572. }
  573. resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
  574. if err != nil {
  575. _ = coordinator.Close()
  576. if retries <= 0 {
  577. s.parent.handleError(err, "", -1)
  578. return
  579. }
  580. retries--
  581. continue
  582. }
  583. switch resp.Err {
  584. case ErrNoError:
  585. retries = s.parent.config.Metadata.Retry.Max
  586. case ErrRebalanceInProgress, ErrUnknownMemberID, ErrIllegalGeneration:
  587. return
  588. default:
  589. s.parent.handleError(err, "", -1)
  590. return
  591. }
  592. select {
  593. case <-pause.C:
  594. case <-s.hbDying:
  595. return
  596. }
  597. }
  598. }
  599. // --------------------------------------------------------------------
  600. // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
  601. // It also provides hooks for your consumer group session life-cycle and allow you to
  602. // trigger logic before or after the consume loop(s).
  603. //
  604. // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
  605. // ensure that all state is safely protected against race conditions.
  606. type ConsumerGroupHandler interface {
  607. // Setup is run at the beginning of a new session, before ConsumeClaim.
  608. Setup(ConsumerGroupSession) error
  609. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  610. // but before the offsets are committed for the very last time.
  611. Cleanup(ConsumerGroupSession) error
  612. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  613. // Once the Messages() channel is closed, the Handler must finish its processing
  614. // loop and exit.
  615. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  616. }
  617. // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
  618. type ConsumerGroupClaim interface {
  619. // Topic returns the consumed topic name.
  620. Topic() string
  621. // Partition returns the consumed partition.
  622. Partition() int32
  623. // InitialOffset returns the initial offset that was used as a starting point for this claim.
  624. InitialOffset() int64
  625. // HighWaterMarkOffset returns the high water mark offset of the partition,
  626. // i.e. the offset that will be used for the next message that will be produced.
  627. // You can use this to determine how far behind the processing is.
  628. HighWaterMarkOffset() int64
  629. // Messages returns the read channel for the messages that are returned by
  630. // the broker. The messages channel will be closed when a new rebalance cycle
  631. // is due. You must finish processing and mark offsets within
  632. // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
  633. // re-assigned to another group member.
  634. Messages() <-chan *ConsumerMessage
  635. }
  636. type consumerGroupClaim struct {
  637. topic string
  638. partition int32
  639. offset int64
  640. PartitionConsumer
  641. }
  642. func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  643. pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
  644. if err == ErrOffsetOutOfRange {
  645. offset = sess.parent.config.Consumer.Offsets.Initial
  646. pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
  647. }
  648. if err != nil {
  649. return nil, err
  650. }
  651. go func() {
  652. for err := range pcm.Errors() {
  653. sess.parent.handleError(err, topic, partition)
  654. }
  655. }()
  656. return &consumerGroupClaim{
  657. topic: topic,
  658. partition: partition,
  659. offset: offset,
  660. PartitionConsumer: pcm,
  661. }, nil
  662. }
  663. func (c *consumerGroupClaim) Topic() string { return c.topic }
  664. func (c *consumerGroupClaim) Partition() int32 { return c.partition }
  665. func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
  666. // Drains messages and errors, ensures the claim is fully closed.
  667. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
  668. go func() {
  669. for range c.Messages() {
  670. }
  671. }()
  672. for err := range c.Errors() {
  673. errs = append(errs, err)
  674. }
  675. return
  676. }