offset_manager.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. // Close stops the OffsetManager from managing offsets. It is required to call this function
  13. // before an OffsetManager object passes out of scope, as it will otherwise
  14. // leak memory. You must call this after all the PartitionOffsetManagers are closed.
  15. Close() error
  16. }
  17. type offsetManager struct {
  18. client Client
  19. conf *Config
  20. group string
  21. lock sync.Mutex
  22. poms map[string]map[int32]*partitionOffsetManager
  23. boms map[*Broker]*brokerOffsetManager
  24. }
  25. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  26. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  27. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  28. // Check that we are not dealing with a closed Client before processing any other arguments
  29. if client.Closed() {
  30. return nil, ErrClosedClient
  31. }
  32. om := &offsetManager{
  33. client: client,
  34. conf: client.Config(),
  35. group: group,
  36. poms: make(map[string]map[int32]*partitionOffsetManager),
  37. boms: make(map[*Broker]*brokerOffsetManager),
  38. }
  39. return om, nil
  40. }
  41. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  42. pom, err := om.newPartitionOffsetManager(topic, partition)
  43. if err != nil {
  44. return nil, err
  45. }
  46. om.lock.Lock()
  47. defer om.lock.Unlock()
  48. topicManagers := om.poms[topic]
  49. if topicManagers == nil {
  50. topicManagers = make(map[int32]*partitionOffsetManager)
  51. om.poms[topic] = topicManagers
  52. }
  53. if topicManagers[partition] != nil {
  54. return nil, ConfigurationError("That topic/partition is already being managed")
  55. }
  56. topicManagers[partition] = pom
  57. return pom, nil
  58. }
  59. func (om *offsetManager) Close() error {
  60. return nil
  61. }
  62. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  63. om.lock.Lock()
  64. defer om.lock.Unlock()
  65. bom := om.boms[broker]
  66. if bom == nil {
  67. bom = om.newBrokerOffsetManager(broker)
  68. om.boms[broker] = bom
  69. }
  70. bom.refs++
  71. return bom
  72. }
  73. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  74. om.lock.Lock()
  75. defer om.lock.Unlock()
  76. bom.refs--
  77. if bom.refs == 0 {
  78. close(bom.updateSubscriptions)
  79. if om.boms[bom.broker] == bom {
  80. delete(om.boms, bom.broker)
  81. }
  82. }
  83. }
  84. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  85. om.lock.Lock()
  86. defer om.lock.Unlock()
  87. delete(om.boms, bom.broker)
  88. }
  89. func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
  90. om.lock.Lock()
  91. defer om.lock.Unlock()
  92. delete(om.poms[pom.topic], pom.partition)
  93. if len(om.poms[pom.topic]) == 0 {
  94. delete(om.poms, pom.topic)
  95. }
  96. }
  97. // Partition Offset Manager
  98. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  99. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  100. // out of scope.
  101. type PartitionOffsetManager interface {
  102. // NextOffset returns the next offset that should be consumed for the managed partition, accompanied
  103. // by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
  104. // NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
  105. // offset was committed for this partition yet.
  106. NextOffset() (int64, string)
  107. // MarkOffset marks the provided offset as processed, alongside a metadata string that represents
  108. // the state of the partition consumer at that point in time. The metadata string can be used by
  109. // another consumer to restore that state, so it can resume consumption.
  110. //
  111. // Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
  112. // for efficiency reasons, and it may never be committed if your application crashes. This means that
  113. // you may end up processing the same message twice, and your processing should ideally be idempotent.
  114. MarkOffset(offset int64, metadata string)
  115. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  116. // errors are logged and not returned over this channel. If you want to implement any custom error
  117. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  118. Errors() <-chan *ConsumerError
  119. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  120. // after which you should wait until the 'errors' channel has been drained and closed.
  121. // It is required to call this function, or Close before a consumer object passes out of scope,
  122. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  123. // client.
  124. AsyncClose()
  125. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  126. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  127. // leak memory. You must call this before calling Close on the underlying client.
  128. Close() error
  129. }
  130. type partitionOffsetManager struct {
  131. parent *offsetManager
  132. topic string
  133. partition int32
  134. lock sync.Mutex
  135. offset int64
  136. metadata string
  137. dirty bool
  138. clean chan none
  139. broker *brokerOffsetManager
  140. errors chan *ConsumerError
  141. rebalance chan none
  142. dying chan none
  143. }
  144. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  145. pom := &partitionOffsetManager{
  146. parent: om,
  147. topic: topic,
  148. partition: partition,
  149. clean: make(chan none),
  150. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  151. rebalance: make(chan none, 1),
  152. dying: make(chan none),
  153. }
  154. if err := pom.selectBroker(); err != nil {
  155. return nil, err
  156. }
  157. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  158. return nil, err
  159. }
  160. pom.broker.updateSubscriptions <- pom
  161. go withRecover(pom.mainLoop)
  162. return pom, nil
  163. }
  164. func (pom *partitionOffsetManager) mainLoop() {
  165. for {
  166. select {
  167. case <-pom.rebalance:
  168. if err := pom.selectBroker(); err != nil {
  169. pom.handleError(err)
  170. pom.rebalance <- none{}
  171. } else {
  172. pom.broker.updateSubscriptions <- pom
  173. }
  174. case <-pom.dying:
  175. if pom.broker != nil {
  176. select {
  177. case <-pom.rebalance:
  178. case pom.broker.updateSubscriptions <- pom:
  179. }
  180. pom.parent.unrefBrokerOffsetManager(pom.broker)
  181. }
  182. pom.parent.abandonPartitionOffsetManager(pom)
  183. close(pom.errors)
  184. return
  185. }
  186. }
  187. }
  188. func (pom *partitionOffsetManager) selectBroker() error {
  189. if pom.broker != nil {
  190. pom.parent.unrefBrokerOffsetManager(pom.broker)
  191. pom.broker = nil
  192. }
  193. var broker *Broker
  194. var err error
  195. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  196. return err
  197. }
  198. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  199. return err
  200. }
  201. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  202. return nil
  203. }
  204. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  205. request := new(OffsetFetchRequest)
  206. request.Version = 1
  207. request.ConsumerGroup = pom.parent.group
  208. request.AddPartition(pom.topic, pom.partition)
  209. response, err := pom.broker.broker.FetchOffset(request)
  210. if err != nil {
  211. return err
  212. }
  213. block := response.GetBlock(pom.topic, pom.partition)
  214. if block == nil {
  215. return ErrIncompleteResponse
  216. }
  217. switch block.Err {
  218. case ErrNoError:
  219. pom.offset = block.Offset
  220. pom.metadata = block.Metadata
  221. return nil
  222. case ErrNotCoordinatorForConsumer:
  223. if retries <= 0 {
  224. return block.Err
  225. }
  226. if err := pom.selectBroker(); err != nil {
  227. return err
  228. }
  229. return pom.fetchInitialOffset(retries - 1)
  230. case ErrOffsetsLoadInProgress:
  231. if retries <= 0 {
  232. return block.Err
  233. }
  234. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  235. return pom.fetchInitialOffset(retries - 1)
  236. default:
  237. return block.Err
  238. }
  239. }
  240. func (pom *partitionOffsetManager) handleError(err error) {
  241. cErr := &ConsumerError{
  242. Topic: pom.topic,
  243. Partition: pom.partition,
  244. Err: err,
  245. }
  246. if pom.parent.conf.Consumer.Return.Errors {
  247. pom.errors <- cErr
  248. } else {
  249. Logger.Println(cErr)
  250. }
  251. }
  252. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  253. return pom.errors
  254. }
  255. func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
  256. pom.lock.Lock()
  257. defer pom.lock.Unlock()
  258. if offset > pom.offset {
  259. pom.offset = offset
  260. pom.metadata = metadata
  261. pom.dirty = true
  262. }
  263. }
  264. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  265. pom.lock.Lock()
  266. defer pom.lock.Unlock()
  267. if pom.offset == offset && pom.metadata == metadata {
  268. pom.dirty = false
  269. select {
  270. case pom.clean <- none{}:
  271. default:
  272. }
  273. }
  274. }
  275. func (pom *partitionOffsetManager) NextOffset() (int64, string) {
  276. pom.lock.Lock()
  277. defer pom.lock.Unlock()
  278. if pom.offset >= 0 {
  279. return pom.offset + 1, pom.metadata
  280. } else {
  281. return pom.parent.conf.Consumer.Offsets.Initial, ""
  282. }
  283. }
  284. func (pom *partitionOffsetManager) AsyncClose() {
  285. go func() {
  286. pom.lock.Lock()
  287. dirty := pom.dirty
  288. pom.lock.Unlock()
  289. if dirty {
  290. <-pom.clean
  291. }
  292. close(pom.dying)
  293. }()
  294. }
  295. func (pom *partitionOffsetManager) Close() error {
  296. pom.AsyncClose()
  297. var errors ConsumerErrors
  298. for err := range pom.errors {
  299. errors = append(errors, err)
  300. }
  301. if len(errors) > 0 {
  302. return errors
  303. }
  304. return nil
  305. }
  306. // Broker Offset Manager
  307. type brokerOffsetManager struct {
  308. parent *offsetManager
  309. broker *Broker
  310. timer *time.Ticker
  311. updateSubscriptions chan *partitionOffsetManager
  312. subscriptions map[*partitionOffsetManager]none
  313. refs int
  314. }
  315. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  316. bom := &brokerOffsetManager{
  317. parent: om,
  318. broker: broker,
  319. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  320. updateSubscriptions: make(chan *partitionOffsetManager),
  321. subscriptions: make(map[*partitionOffsetManager]none),
  322. }
  323. go withRecover(bom.mainLoop)
  324. return bom
  325. }
  326. func (bom *brokerOffsetManager) mainLoop() {
  327. for {
  328. select {
  329. case <-bom.timer.C:
  330. if len(bom.subscriptions) > 0 {
  331. bom.flushToBroker()
  332. }
  333. case s, ok := <-bom.updateSubscriptions:
  334. if !ok {
  335. bom.timer.Stop()
  336. return
  337. }
  338. if _, ok := bom.subscriptions[s]; ok {
  339. delete(bom.subscriptions, s)
  340. } else {
  341. bom.subscriptions[s] = none{}
  342. }
  343. }
  344. }
  345. }
  346. func (bom *brokerOffsetManager) flushToBroker() {
  347. request := bom.constructRequest()
  348. if request == nil {
  349. return
  350. }
  351. response, err := bom.broker.CommitOffset(request)
  352. if err != nil {
  353. bom.abort(err)
  354. return
  355. }
  356. for s := range bom.subscriptions {
  357. if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
  358. continue
  359. }
  360. var err KError
  361. var ok bool
  362. if response.Errors[s.topic] == nil {
  363. s.handleError(ErrIncompleteResponse)
  364. delete(bom.subscriptions, s)
  365. s.rebalance <- none{}
  366. continue
  367. }
  368. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  369. s.handleError(ErrIncompleteResponse)
  370. delete(bom.subscriptions, s)
  371. s.rebalance <- none{}
  372. continue
  373. }
  374. switch err {
  375. case ErrNoError:
  376. block := request.blocks[s.topic][s.partition]
  377. s.updateCommitted(block.offset, block.metadata)
  378. break
  379. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  380. delete(bom.subscriptions, s)
  381. s.rebalance <- none{}
  382. default:
  383. s.handleError(err)
  384. delete(bom.subscriptions, s)
  385. s.rebalance <- none{}
  386. }
  387. }
  388. }
  389. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  390. r := &OffsetCommitRequest{
  391. Version: 1,
  392. ConsumerGroup: bom.parent.group,
  393. }
  394. for s := range bom.subscriptions {
  395. s.lock.Lock()
  396. if s.dirty {
  397. r.AddBlock(s.topic, s.partition, s.offset, ReceiveTime, s.metadata)
  398. }
  399. s.lock.Unlock()
  400. }
  401. if len(r.blocks) > 0 {
  402. return r
  403. }
  404. return nil
  405. }
  406. func (bom *brokerOffsetManager) abort(err error) {
  407. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  408. bom.parent.abandonBroker(bom)
  409. for pom := range bom.subscriptions {
  410. pom.handleError(err)
  411. pom.rebalance <- none{}
  412. }
  413. for s := range bom.updateSubscriptions {
  414. if _, ok := bom.subscriptions[s]; !ok {
  415. s.handleError(err)
  416. s.rebalance <- none{}
  417. }
  418. }
  419. bom.subscriptions = make(map[*partitionOffsetManager]none)
  420. }