offset_manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. const groupGenerationUndefined = -1
  8. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  9. type OffsetManager interface {
  10. // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
  11. // It will return an error if this OffsetManager is already managing the given
  12. // topic/partition.
  13. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  14. // Close stops the OffsetManager from managing offsets. It is required to call
  15. // this function before an OffsetManager object passes out of scope, as it
  16. // will otherwise leak memory. You must call this after all the
  17. // PartitionOffsetManagers are closed.
  18. Close() error
  19. }
  20. type offsetManager struct {
  21. client Client
  22. conf *Config
  23. group string
  24. lock sync.Mutex
  25. poms map[string]map[int32]*partitionOffsetManager
  26. boms map[*Broker]*brokerOffsetManager
  27. }
  28. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  29. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  30. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  31. // Check that we are not dealing with a closed Client before processing any other arguments
  32. if client.Closed() {
  33. return nil, ErrClosedClient
  34. }
  35. om := &offsetManager{
  36. client: client,
  37. conf: client.Config(),
  38. group: group,
  39. poms: make(map[string]map[int32]*partitionOffsetManager),
  40. boms: make(map[*Broker]*brokerOffsetManager),
  41. }
  42. return om, nil
  43. }
  44. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  45. pom, err := om.newPartitionOffsetManager(topic, partition)
  46. if err != nil {
  47. return nil, err
  48. }
  49. om.lock.Lock()
  50. defer om.lock.Unlock()
  51. topicManagers := om.poms[topic]
  52. if topicManagers == nil {
  53. topicManagers = make(map[int32]*partitionOffsetManager)
  54. om.poms[topic] = topicManagers
  55. }
  56. if topicManagers[partition] != nil {
  57. return nil, ConfigurationError("That topic/partition is already being managed")
  58. }
  59. topicManagers[partition] = pom
  60. return pom, nil
  61. }
  62. func (om *offsetManager) Close() error {
  63. return nil
  64. }
  65. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  66. om.lock.Lock()
  67. defer om.lock.Unlock()
  68. bom := om.boms[broker]
  69. if bom == nil {
  70. bom = om.newBrokerOffsetManager(broker)
  71. om.boms[broker] = bom
  72. }
  73. bom.refs++
  74. return bom
  75. }
  76. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  77. om.lock.Lock()
  78. defer om.lock.Unlock()
  79. bom.refs--
  80. if bom.refs == 0 {
  81. close(bom.updateSubscriptions)
  82. if om.boms[bom.broker] == bom {
  83. delete(om.boms, bom.broker)
  84. }
  85. }
  86. }
  87. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  88. om.lock.Lock()
  89. defer om.lock.Unlock()
  90. delete(om.boms, bom.broker)
  91. }
  92. func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
  93. om.lock.Lock()
  94. defer om.lock.Unlock()
  95. delete(om.poms[pom.topic], pom.partition)
  96. if len(om.poms[pom.topic]) == 0 {
  97. delete(om.poms, pom.topic)
  98. }
  99. }
  100. // Partition Offset Manager
  101. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  102. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  103. // out of scope.
  104. type PartitionOffsetManager interface {
  105. // NextOffset returns the next offset that should be consumed for the managed
  106. // partition, accompanied by metadata which can be used to reconstruct the state
  107. // of the partition consumer when it resumes. NextOffset() will return
  108. // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
  109. // was committed for this partition yet.
  110. NextOffset() (int64, string)
  111. // MarkOffset marks the provided offset as processed, alongside a metadata string
  112. // that represents the state of the partition consumer at that point in time. The
  113. // metadata string can be used by another consumer to restore that state, so it
  114. // can resume consumption.
  115. //
  116. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  117. // store immediately for efficiency reasons, and it may never be committed if
  118. // your application crashes. This means that you may end up processing the same
  119. // message twice, and your processing should ideally be idempotent.
  120. MarkOffset(offset int64, metadata string)
  121. // Errors returns a read channel of errors that occur during offset management, if
  122. // enabled. By default, errors are logged and not returned over this channel. If
  123. // you want to implement any custom error handling, set your config's
  124. // Consumer.Return.Errors setting to true, and read from this channel.
  125. Errors() <-chan *ConsumerError
  126. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
  127. // return immediately, after which you should wait until the 'errors' channel has
  128. // been drained and closed. It is required to call this function, or Close before
  129. // a consumer object passes out of scope, as it will otherwise leak memory. You
  130. // must call this before calling Close on the underlying client.
  131. AsyncClose()
  132. // Close stops the PartitionOffsetManager from managing offsets. It is required to
  133. // call this function (or AsyncClose) before a PartitionOffsetManager object
  134. // passes out of scope, as it will otherwise leak memory. You must call this
  135. // before calling Close on the underlying client.
  136. Close() error
  137. }
  138. type partitionOffsetManager struct {
  139. parent *offsetManager
  140. topic string
  141. partition int32
  142. lock sync.Mutex
  143. offset int64
  144. metadata string
  145. dirty bool
  146. clean chan none
  147. broker *brokerOffsetManager
  148. errors chan *ConsumerError
  149. rebalance chan none
  150. dying chan none
  151. }
  152. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  153. pom := &partitionOffsetManager{
  154. parent: om,
  155. topic: topic,
  156. partition: partition,
  157. clean: make(chan none),
  158. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  159. rebalance: make(chan none, 1),
  160. dying: make(chan none),
  161. }
  162. if err := pom.selectBroker(); err != nil {
  163. return nil, err
  164. }
  165. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  166. return nil, err
  167. }
  168. pom.broker.updateSubscriptions <- pom
  169. go withRecover(pom.mainLoop)
  170. return pom, nil
  171. }
  172. func (pom *partitionOffsetManager) mainLoop() {
  173. for {
  174. select {
  175. case <-pom.rebalance:
  176. if err := pom.selectBroker(); err != nil {
  177. pom.handleError(err)
  178. pom.rebalance <- none{}
  179. } else {
  180. pom.broker.updateSubscriptions <- pom
  181. }
  182. case <-pom.dying:
  183. if pom.broker != nil {
  184. select {
  185. case <-pom.rebalance:
  186. case pom.broker.updateSubscriptions <- pom:
  187. }
  188. pom.parent.unrefBrokerOffsetManager(pom.broker)
  189. }
  190. pom.parent.abandonPartitionOffsetManager(pom)
  191. close(pom.errors)
  192. return
  193. }
  194. }
  195. }
  196. func (pom *partitionOffsetManager) selectBroker() error {
  197. if pom.broker != nil {
  198. pom.parent.unrefBrokerOffsetManager(pom.broker)
  199. pom.broker = nil
  200. }
  201. var broker *Broker
  202. var err error
  203. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  204. return err
  205. }
  206. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  207. return err
  208. }
  209. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  210. return nil
  211. }
  212. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  213. request := new(OffsetFetchRequest)
  214. request.Version = 1
  215. request.ConsumerGroup = pom.parent.group
  216. request.AddPartition(pom.topic, pom.partition)
  217. response, err := pom.broker.broker.FetchOffset(request)
  218. if err != nil {
  219. return err
  220. }
  221. block := response.GetBlock(pom.topic, pom.partition)
  222. if block == nil {
  223. return ErrIncompleteResponse
  224. }
  225. switch block.Err {
  226. case ErrNoError:
  227. pom.offset = block.Offset
  228. pom.metadata = block.Metadata
  229. return nil
  230. case ErrNotCoordinatorForConsumer:
  231. if retries <= 0 {
  232. return block.Err
  233. }
  234. if err := pom.selectBroker(); err != nil {
  235. return err
  236. }
  237. return pom.fetchInitialOffset(retries - 1)
  238. case ErrOffsetsLoadInProgress:
  239. if retries <= 0 {
  240. return block.Err
  241. }
  242. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  243. return pom.fetchInitialOffset(retries - 1)
  244. default:
  245. return block.Err
  246. }
  247. }
  248. func (pom *partitionOffsetManager) handleError(err error) {
  249. cErr := &ConsumerError{
  250. Topic: pom.topic,
  251. Partition: pom.partition,
  252. Err: err,
  253. }
  254. if pom.parent.conf.Consumer.Return.Errors {
  255. pom.errors <- cErr
  256. } else {
  257. Logger.Println(cErr)
  258. }
  259. }
  260. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  261. return pom.errors
  262. }
  263. func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
  264. pom.lock.Lock()
  265. defer pom.lock.Unlock()
  266. if offset > pom.offset {
  267. pom.offset = offset
  268. pom.metadata = metadata
  269. pom.dirty = true
  270. }
  271. }
  272. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  273. pom.lock.Lock()
  274. defer pom.lock.Unlock()
  275. if pom.offset == offset && pom.metadata == metadata {
  276. pom.dirty = false
  277. select {
  278. case pom.clean <- none{}:
  279. default:
  280. }
  281. }
  282. }
  283. func (pom *partitionOffsetManager) NextOffset() (int64, string) {
  284. pom.lock.Lock()
  285. defer pom.lock.Unlock()
  286. if pom.offset >= 0 {
  287. return pom.offset + 1, pom.metadata
  288. }
  289. return pom.parent.conf.Consumer.Offsets.Initial, ""
  290. }
  291. func (pom *partitionOffsetManager) AsyncClose() {
  292. go func() {
  293. pom.lock.Lock()
  294. dirty := pom.dirty
  295. pom.lock.Unlock()
  296. if dirty {
  297. <-pom.clean
  298. }
  299. close(pom.dying)
  300. }()
  301. }
  302. func (pom *partitionOffsetManager) Close() error {
  303. pom.AsyncClose()
  304. var errors ConsumerErrors
  305. for err := range pom.errors {
  306. errors = append(errors, err)
  307. }
  308. if len(errors) > 0 {
  309. return errors
  310. }
  311. return nil
  312. }
  313. // Broker Offset Manager
  314. type brokerOffsetManager struct {
  315. parent *offsetManager
  316. broker *Broker
  317. timer *time.Ticker
  318. updateSubscriptions chan *partitionOffsetManager
  319. subscriptions map[*partitionOffsetManager]none
  320. refs int
  321. }
  322. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  323. bom := &brokerOffsetManager{
  324. parent: om,
  325. broker: broker,
  326. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  327. updateSubscriptions: make(chan *partitionOffsetManager),
  328. subscriptions: make(map[*partitionOffsetManager]none),
  329. }
  330. go withRecover(bom.mainLoop)
  331. return bom
  332. }
  333. func (bom *brokerOffsetManager) mainLoop() {
  334. for {
  335. select {
  336. case <-bom.timer.C:
  337. if len(bom.subscriptions) > 0 {
  338. bom.flushToBroker()
  339. }
  340. case s, ok := <-bom.updateSubscriptions:
  341. if !ok {
  342. bom.timer.Stop()
  343. return
  344. }
  345. if _, ok := bom.subscriptions[s]; ok {
  346. delete(bom.subscriptions, s)
  347. } else {
  348. bom.subscriptions[s] = none{}
  349. }
  350. }
  351. }
  352. }
  353. func (bom *brokerOffsetManager) flushToBroker() {
  354. request := bom.constructRequest()
  355. if request == nil {
  356. return
  357. }
  358. response, err := bom.broker.CommitOffset(request)
  359. if err != nil {
  360. bom.abort(err)
  361. return
  362. }
  363. for s := range bom.subscriptions {
  364. if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
  365. continue
  366. }
  367. var err KError
  368. var ok bool
  369. if response.Errors[s.topic] == nil {
  370. s.handleError(ErrIncompleteResponse)
  371. delete(bom.subscriptions, s)
  372. s.rebalance <- none{}
  373. continue
  374. }
  375. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  376. s.handleError(ErrIncompleteResponse)
  377. delete(bom.subscriptions, s)
  378. s.rebalance <- none{}
  379. continue
  380. }
  381. switch err {
  382. case ErrNoError:
  383. block := request.blocks[s.topic][s.partition]
  384. s.updateCommitted(block.offset, block.metadata)
  385. break
  386. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  387. delete(bom.subscriptions, s)
  388. s.rebalance <- none{}
  389. default:
  390. s.handleError(err)
  391. delete(bom.subscriptions, s)
  392. s.rebalance <- none{}
  393. }
  394. }
  395. }
  396. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  397. r := &OffsetCommitRequest{
  398. Version: 1,
  399. ConsumerGroup: bom.parent.group,
  400. ConsumerGroupGeneration: groupGenerationUndefined,
  401. }
  402. for s := range bom.subscriptions {
  403. s.lock.Lock()
  404. if s.dirty {
  405. r.AddBlock(s.topic, s.partition, s.offset, ReceiveTime, s.metadata)
  406. }
  407. s.lock.Unlock()
  408. }
  409. if len(r.blocks) > 0 {
  410. return r
  411. }
  412. return nil
  413. }
  414. func (bom *brokerOffsetManager) abort(err error) {
  415. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  416. bom.parent.abandonBroker(bom)
  417. for pom := range bom.subscriptions {
  418. pom.handleError(err)
  419. pom.rebalance <- none{}
  420. }
  421. for s := range bom.updateSubscriptions {
  422. if _, ok := bom.subscriptions[s]; !ok {
  423. s.handleError(err)
  424. s.rebalance <- none{}
  425. }
  426. }
  427. bom.subscriptions = make(map[*partitionOffsetManager]none)
  428. }