consumer_group.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. package sarama
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
  11. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
  12. // ConsumerGroup is responsible for dividing up processing of topics and partitions
  13. // over a collection of processes (the members of the consumer group).
  14. type ConsumerGroup interface {
  15. // Consume joins a cluster of consumers for a given list of topics and
  16. // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
  17. //
  18. // The life-cycle of a session is represented by the following steps:
  19. //
  20. // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
  21. // and is assigned their "fair share" of partitions, aka 'claims'.
  22. // 2. Before processing starts, the handler's Setup() hook is called to notify the user
  23. // of the claims and allow any necessary preparation or alteration of state.
  24. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
  25. // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
  26. // from concurrent reads/writes.
  27. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
  28. // parent context is cancelled or when a server-side rebalance cycle is initiated.
  29. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
  30. // to allow the user to perform any final tasks before a rebalance.
  31. // 6. Finally, marked offsets are committed one last time before claims are released.
  32. //
  33. // Please note, that once a rebalance is triggered, sessions must be completed within
  34. // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
  35. // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
  36. // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
  37. // commit failures.
  38. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  39. // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  40. // By default, errors are logged and not returned over this channel.
  41. // If you want to implement any custom error handling, set your config's
  42. // Consumer.Return.Errors setting to true, and read from this channel.
  43. Errors() <-chan error
  44. // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  45. // this function before the object passes out of scope, as it will otherwise leak memory.
  46. Close() error
  47. }
  48. type consumerGroup struct {
  49. client Client
  50. ownClient bool
  51. config *Config
  52. consumer Consumer
  53. groupID string
  54. memberID string
  55. errors chan error
  56. lock sync.Mutex
  57. closed chan none
  58. closeOnce sync.Once
  59. }
  60. // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
  61. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
  62. client, err := NewClient(addrs, config)
  63. if err != nil {
  64. return nil, err
  65. }
  66. c, err := NewConsumerGroupFromClient(groupID, client)
  67. if err != nil {
  68. _ = client.Close()
  69. return nil, err
  70. }
  71. c.(*consumerGroup).ownClient = true
  72. return c, nil
  73. }
  74. // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
  75. // necessary to call Close() on the underlying client when shutting down this consumer.
  76. // PLEASE NOTE: consumer groups can only re-use but not share clients.
  77. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
  78. config := client.Config()
  79. if !config.Version.IsAtLeast(V0_10_2_0) {
  80. return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
  81. }
  82. consumer, err := NewConsumerFromClient(client)
  83. if err != nil {
  84. return nil, err
  85. }
  86. return &consumerGroup{
  87. client: client,
  88. consumer: consumer,
  89. config: config,
  90. groupID: groupID,
  91. errors: make(chan error, config.ChannelBufferSize),
  92. closed: make(chan none),
  93. }, nil
  94. }
  95. // Errors implements ConsumerGroup.
  96. func (c *consumerGroup) Errors() <-chan error { return c.errors }
  97. // Close implements ConsumerGroup.
  98. func (c *consumerGroup) Close() (err error) {
  99. c.closeOnce.Do(func() {
  100. close(c.closed)
  101. c.lock.Lock()
  102. defer c.lock.Unlock()
  103. // leave group
  104. if e := c.leave(); e != nil {
  105. err = e
  106. }
  107. // drain errors
  108. go func() {
  109. close(c.errors)
  110. }()
  111. for e := range c.errors {
  112. err = e
  113. }
  114. if c.ownClient {
  115. if e := c.client.Close(); e != nil {
  116. err = e
  117. }
  118. }
  119. })
  120. return
  121. }
  122. // Consume implements ConsumerGroup.
  123. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  124. // Ensure group is not closed
  125. select {
  126. case <-c.closed:
  127. return ErrClosedConsumerGroup
  128. default:
  129. }
  130. c.lock.Lock()
  131. defer c.lock.Unlock()
  132. // Quick exit when no topics are provided
  133. if len(topics) == 0 {
  134. return fmt.Errorf("no topics provided")
  135. }
  136. // Refresh metadata for requested topics
  137. if err := c.client.RefreshMetadata(topics...); err != nil {
  138. return err
  139. }
  140. // Get coordinator
  141. coordinator, err := c.client.Coordinator(c.groupID)
  142. if err != nil {
  143. return err
  144. }
  145. // Init session
  146. sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
  147. if err == ErrClosedClient {
  148. return ErrClosedConsumerGroup
  149. } else if err != nil {
  150. return err
  151. }
  152. // Wait for session exit signal
  153. <-sess.ctx.Done()
  154. // Gracefully release session claims
  155. return sess.release(true)
  156. }
  157. func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  158. // Join consumer group
  159. join, err := c.joinGroupRequest(coordinator, topics)
  160. if err != nil {
  161. _ = coordinator.Close()
  162. return nil, err
  163. }
  164. switch join.Err {
  165. case ErrNoError:
  166. c.memberID = join.MemberId
  167. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  168. c.memberID = ""
  169. return c.newSession(ctx, coordinator, topics, handler, retries)
  170. case ErrRebalanceInProgress: // retry after backoff
  171. if retries <= 0 {
  172. return nil, join.Err
  173. }
  174. select {
  175. case <-c.closed:
  176. return nil, ErrClosedConsumerGroup
  177. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  178. }
  179. return c.newSession(ctx, coordinator, topics, handler, retries-1)
  180. default:
  181. return nil, join.Err
  182. }
  183. // Prepare distribution plan if we joined as the leader
  184. var plan BalanceStrategyPlan
  185. if join.LeaderId == join.MemberId {
  186. members, err := join.GetMembers()
  187. if err != nil {
  188. return nil, err
  189. }
  190. plan, err = c.balance(members)
  191. if err != nil {
  192. return nil, err
  193. }
  194. }
  195. // Sync consumer group
  196. sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  197. if err != nil {
  198. _ = coordinator.Close()
  199. return nil, err
  200. }
  201. switch sync.Err {
  202. case ErrNoError:
  203. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  204. c.memberID = ""
  205. return c.newSession(ctx, coordinator, topics, handler, retries)
  206. case ErrRebalanceInProgress: // retry after backoff
  207. if retries <= 0 {
  208. return nil, sync.Err
  209. }
  210. select {
  211. case <-c.closed:
  212. return nil, ErrClosedConsumerGroup
  213. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  214. }
  215. return c.newSession(ctx, coordinator, topics, handler, retries-1)
  216. default:
  217. return nil, sync.Err
  218. }
  219. // Retrieve and sort claims
  220. var claims map[string][]int32
  221. if len(sync.MemberAssignment) > 0 {
  222. members, err := sync.GetMemberAssignment()
  223. if err != nil {
  224. return nil, err
  225. }
  226. claims = members.Topics
  227. for _, partitions := range claims {
  228. sort.Sort(int32Slice(partitions))
  229. }
  230. }
  231. return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
  232. }
  233. func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
  234. req := &JoinGroupRequest{
  235. GroupId: c.groupID,
  236. MemberId: c.memberID,
  237. SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
  238. ProtocolType: "consumer",
  239. }
  240. if c.config.Version.IsAtLeast(V0_10_1_0) {
  241. req.Version = 1
  242. req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
  243. }
  244. meta := &ConsumerGroupMemberMetadata{
  245. Topics: topics,
  246. UserData: c.config.Consumer.Group.Member.UserData,
  247. }
  248. strategy := c.config.Consumer.Group.Rebalance.Strategy
  249. if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
  250. return nil, err
  251. }
  252. return coordinator.JoinGroup(req)
  253. }
  254. func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
  255. req := &SyncGroupRequest{
  256. GroupId: c.groupID,
  257. MemberId: c.memberID,
  258. GenerationId: generationID,
  259. }
  260. for memberID, topics := range plan {
  261. err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
  262. Topics: topics,
  263. })
  264. if err != nil {
  265. return nil, err
  266. }
  267. }
  268. return coordinator.SyncGroup(req)
  269. }
  270. func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
  271. req := &HeartbeatRequest{
  272. GroupId: c.groupID,
  273. MemberId: memberID,
  274. GenerationId: generationID,
  275. }
  276. return coordinator.Heartbeat(req)
  277. }
  278. func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  279. topics := make(map[string][]int32)
  280. for _, meta := range members {
  281. for _, topic := range meta.Topics {
  282. topics[topic] = nil
  283. }
  284. }
  285. for topic := range topics {
  286. partitions, err := c.client.Partitions(topic)
  287. if err != nil {
  288. return nil, err
  289. }
  290. topics[topic] = partitions
  291. }
  292. strategy := c.config.Consumer.Group.Rebalance.Strategy
  293. return strategy.Plan(members, topics)
  294. }
  295. // Leaves the cluster, called by Close, protected by lock.
  296. func (c *consumerGroup) leave() error {
  297. if c.memberID == "" {
  298. return nil
  299. }
  300. coordinator, err := c.client.Coordinator(c.groupID)
  301. if err != nil {
  302. return err
  303. }
  304. resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
  305. GroupId: c.groupID,
  306. MemberId: c.memberID,
  307. })
  308. if err != nil {
  309. _ = coordinator.Close()
  310. return err
  311. }
  312. // Unset memberID
  313. c.memberID = ""
  314. // Check response
  315. switch resp.Err {
  316. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
  317. return nil
  318. default:
  319. return resp.Err
  320. }
  321. }
  322. func (c *consumerGroup) handleError(err error, topic string, partition int32) {
  323. select {
  324. case <-c.closed:
  325. return
  326. default:
  327. }
  328. if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
  329. err = &ConsumerError{
  330. Topic: topic,
  331. Partition: partition,
  332. Err: err,
  333. }
  334. }
  335. if c.config.Consumer.Return.Errors {
  336. select {
  337. case c.errors <- err:
  338. default:
  339. }
  340. } else {
  341. Logger.Println(err)
  342. }
  343. }
  344. // --------------------------------------------------------------------
  345. // ConsumerGroupSession represents a consumer group member session.
  346. type ConsumerGroupSession interface {
  347. // Claims returns information about the claimed partitions by topic.
  348. Claims() map[string][]int32
  349. // MemberID returns the cluster member ID.
  350. MemberID() string
  351. // GenerationID returns the current generation ID.
  352. GenerationID() int32
  353. // MarkOffset marks the provided offset, alongside a metadata string
  354. // that represents the state of the partition consumer at that point in time. The
  355. // metadata string can be used by another consumer to restore that state, so it
  356. // can resume consumption.
  357. //
  358. // To follow upstream conventions, you are expected to mark the offset of the
  359. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  360. // you should typically add one to the offset of the last consumed message.
  361. //
  362. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  363. // store immediately for efficiency reasons, and it may never be committed if
  364. // your application crashes. This means that you may end up processing the same
  365. // message twice, and your processing should ideally be idempotent.
  366. MarkOffset(topic string, partition int32, offset int64, metadata string)
  367. // ResetOffset resets to the provided offset, alongside a metadata string that
  368. // represents the state of the partition consumer at that point in time. Reset
  369. // acts as a counterpart to MarkOffset, the difference being that it allows to
  370. // reset an offset to an earlier or smaller value, where MarkOffset only
  371. // allows incrementing the offset. cf MarkOffset for more details.
  372. ResetOffset(topic string, partition int32, offset int64, metadata string)
  373. // MarkMessage marks a message as consumed.
  374. MarkMessage(msg *ConsumerMessage, metadata string)
  375. // Context returns the session context.
  376. Context() context.Context
  377. }
  378. type consumerGroupSession struct {
  379. parent *consumerGroup
  380. memberID string
  381. generationID int32
  382. handler ConsumerGroupHandler
  383. claims map[string][]int32
  384. offsets *offsetManager
  385. ctx context.Context
  386. cancel func()
  387. waitGroup sync.WaitGroup
  388. releaseOnce sync.Once
  389. hbDying, hbDead chan none
  390. }
  391. func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
  392. // init offset manager
  393. offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
  394. if err != nil {
  395. return nil, err
  396. }
  397. // init context
  398. ctx, cancel := context.WithCancel(ctx)
  399. // init session
  400. sess := &consumerGroupSession{
  401. parent: parent,
  402. memberID: memberID,
  403. generationID: generationID,
  404. handler: handler,
  405. offsets: offsets,
  406. claims: claims,
  407. ctx: ctx,
  408. cancel: cancel,
  409. hbDying: make(chan none),
  410. hbDead: make(chan none),
  411. }
  412. // start heartbeat loop
  413. go sess.heartbeatLoop()
  414. // create a POM for each claim
  415. for topic, partitions := range claims {
  416. for _, partition := range partitions {
  417. pom, err := offsets.ManagePartition(topic, partition)
  418. if err != nil {
  419. _ = sess.release(false)
  420. return nil, err
  421. }
  422. // handle POM errors
  423. go func(topic string, partition int32) {
  424. for err := range pom.Errors() {
  425. sess.parent.handleError(err, topic, partition)
  426. }
  427. }(topic, partition)
  428. }
  429. }
  430. // perform setup
  431. if err := handler.Setup(sess); err != nil {
  432. _ = sess.release(true)
  433. return nil, err
  434. }
  435. // start consuming
  436. for topic, partitions := range claims {
  437. for _, partition := range partitions {
  438. sess.waitGroup.Add(1)
  439. go func(topic string, partition int32) {
  440. defer sess.waitGroup.Done()
  441. // cancel the as session as soon as the first
  442. // goroutine exits
  443. defer sess.cancel()
  444. // consume a single topic/partition, blocking
  445. sess.consume(topic, partition)
  446. }(topic, partition)
  447. }
  448. }
  449. return sess, nil
  450. }
  451. func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
  452. func (s *consumerGroupSession) MemberID() string { return s.memberID }
  453. func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
  454. func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
  455. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  456. pom.MarkOffset(offset, metadata)
  457. }
  458. }
  459. func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
  460. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  461. pom.ResetOffset(offset, metadata)
  462. }
  463. }
  464. func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
  465. s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
  466. }
  467. func (s *consumerGroupSession) Context() context.Context {
  468. return s.ctx
  469. }
  470. func (s *consumerGroupSession) consume(topic string, partition int32) {
  471. // quick exit if rebalance is due
  472. select {
  473. case <-s.ctx.Done():
  474. return
  475. case <-s.parent.closed:
  476. return
  477. default:
  478. }
  479. // get next offset
  480. offset := s.parent.config.Consumer.Offsets.Initial
  481. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  482. offset, _ = pom.NextOffset()
  483. }
  484. // create new claim
  485. claim, err := newConsumerGroupClaim(s, topic, partition, offset)
  486. if err != nil {
  487. s.parent.handleError(err, topic, partition)
  488. return
  489. }
  490. // handle errors
  491. go func() {
  492. for err := range claim.Errors() {
  493. s.parent.handleError(err, topic, partition)
  494. }
  495. }()
  496. // trigger close when session is done
  497. go func() {
  498. select {
  499. case <-s.ctx.Done():
  500. case <-s.parent.closed:
  501. }
  502. claim.AsyncClose()
  503. }()
  504. // start processing
  505. if err := s.handler.ConsumeClaim(s, claim); err != nil {
  506. s.parent.handleError(err, topic, partition)
  507. }
  508. // ensure consumer is closed & drained
  509. claim.AsyncClose()
  510. for _, err := range claim.waitClosed() {
  511. s.parent.handleError(err, topic, partition)
  512. }
  513. }
  514. func (s *consumerGroupSession) release(withCleanup bool) (err error) {
  515. // signal release, stop heartbeat
  516. s.cancel()
  517. // wait for consumers to exit
  518. s.waitGroup.Wait()
  519. // perform release
  520. s.releaseOnce.Do(func() {
  521. if withCleanup {
  522. if e := s.handler.Cleanup(s); e != nil {
  523. s.parent.handleError(err, "", -1)
  524. err = e
  525. }
  526. }
  527. if e := s.offsets.Close(); e != nil {
  528. err = e
  529. }
  530. close(s.hbDying)
  531. <-s.hbDead
  532. })
  533. return
  534. }
  535. func (s *consumerGroupSession) heartbeatLoop() {
  536. defer close(s.hbDead)
  537. defer s.cancel() // trigger the end of the session on exit
  538. pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
  539. defer pause.Stop()
  540. retries := s.parent.config.Metadata.Retry.Max
  541. for {
  542. coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
  543. if err != nil {
  544. if retries <= 0 {
  545. s.parent.handleError(err, "", -1)
  546. return
  547. }
  548. select {
  549. case <-s.hbDying:
  550. return
  551. case <-time.After(s.parent.config.Metadata.Retry.Backoff):
  552. retries--
  553. }
  554. continue
  555. }
  556. resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
  557. if err != nil {
  558. _ = coordinator.Close()
  559. retries--
  560. continue
  561. }
  562. switch resp.Err {
  563. case ErrNoError:
  564. retries = s.parent.config.Metadata.Retry.Max
  565. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
  566. return
  567. default:
  568. s.parent.handleError(err, "", -1)
  569. return
  570. }
  571. select {
  572. case <-pause.C:
  573. case <-s.hbDying:
  574. return
  575. }
  576. }
  577. }
  578. // --------------------------------------------------------------------
  579. // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
  580. // It also provides hooks for your consumer group session life-cycle and allow you to
  581. // trigger logic before or after the consume loop(s).
  582. //
  583. // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
  584. // ensure that all state is safely protected against race conditions.
  585. type ConsumerGroupHandler interface {
  586. // Setup is run at the beginning of a new session, before ConsumeClaim.
  587. Setup(ConsumerGroupSession) error
  588. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  589. // but before the offsets are committed for the very last time.
  590. Cleanup(ConsumerGroupSession) error
  591. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  592. // Once the Messages() channel is closed, the Handler must finish its processing
  593. // loop and exit.
  594. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  595. }
  596. // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
  597. type ConsumerGroupClaim interface {
  598. // Topic returns the consumed topic name.
  599. Topic() string
  600. // Partition returns the consumed partition.
  601. Partition() int32
  602. // InitialOffset returns the initial offset that was used as a starting point for this claim.
  603. InitialOffset() int64
  604. // HighWaterMarkOffset returns the high water mark offset of the partition,
  605. // i.e. the offset that will be used for the next message that will be produced.
  606. // You can use this to determine how far behind the processing is.
  607. HighWaterMarkOffset() int64
  608. // Messages returns the read channel for the messages that are returned by
  609. // the broker. The messages channel will be closed when a new rebalance cycle
  610. // is due. You must finish processing and mark offsets within
  611. // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
  612. // re-assigned to another group member.
  613. Messages() <-chan *ConsumerMessage
  614. }
  615. type consumerGroupClaim struct {
  616. topic string
  617. partition int32
  618. offset int64
  619. PartitionConsumer
  620. }
  621. func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  622. pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
  623. if err == ErrOffsetOutOfRange {
  624. offset = sess.parent.config.Consumer.Offsets.Initial
  625. pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
  626. }
  627. if err != nil {
  628. return nil, err
  629. }
  630. go func() {
  631. for err := range pcm.Errors() {
  632. sess.parent.handleError(err, topic, partition)
  633. }
  634. }()
  635. return &consumerGroupClaim{
  636. topic: topic,
  637. partition: partition,
  638. offset: offset,
  639. PartitionConsumer: pcm,
  640. }, nil
  641. }
  642. func (c *consumerGroupClaim) Topic() string { return c.topic }
  643. func (c *consumerGroupClaim) Partition() int32 { return c.partition }
  644. func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
  645. // Drains messages and errors, ensures the claim is fully closed.
  646. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
  647. go func() {
  648. for range c.Messages() {
  649. }
  650. }()
  651. for err := range c.Errors() {
  652. errs = append(errs, err)
  653. }
  654. return
  655. }