offset_manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. // Close stops the OffsetManager from managing offsets. It is required to call this function
  13. // before an OffsetManager object passes out of scope, as it will otherwise
  14. // leak memory. You must call this after all the PartitionOffsetManagers are closed.
  15. Close() error
  16. }
  17. type offsetManager struct {
  18. client Client
  19. conf *Config
  20. group string
  21. lock sync.Mutex
  22. poms map[string]map[int32]*partitionOffsetManager
  23. boms map[*Broker]*brokerOffsetManager
  24. }
  25. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  26. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  27. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  28. // Check that we are not dealing with a closed Client before processing any other arguments
  29. if client.Closed() {
  30. return nil, ErrClosedClient
  31. }
  32. om := &offsetManager{
  33. client: client,
  34. conf: client.Config(),
  35. group: group,
  36. poms: make(map[string]map[int32]*partitionOffsetManager),
  37. boms: make(map[*Broker]*brokerOffsetManager),
  38. }
  39. return om, nil
  40. }
  41. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  42. pom, err := om.newPartitionOffsetManager(topic, partition)
  43. if err != nil {
  44. return nil, err
  45. }
  46. om.lock.Lock()
  47. defer om.lock.Unlock()
  48. topicManagers := om.poms[topic]
  49. if topicManagers == nil {
  50. topicManagers = make(map[int32]*partitionOffsetManager)
  51. om.poms[topic] = topicManagers
  52. }
  53. if topicManagers[partition] != nil {
  54. return nil, ConfigurationError("That topic/partition is already being managed")
  55. }
  56. topicManagers[partition] = pom
  57. return pom, nil
  58. }
  59. func (om *offsetManager) Close() error {
  60. return nil
  61. }
  62. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  63. om.lock.Lock()
  64. defer om.lock.Unlock()
  65. bom := om.boms[broker]
  66. if bom == nil {
  67. bom = om.newBrokerOffsetManager(broker)
  68. om.boms[broker] = bom
  69. }
  70. bom.refs++
  71. return bom
  72. }
  73. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  74. om.lock.Lock()
  75. defer om.lock.Unlock()
  76. bom.refs--
  77. if bom.refs == 0 {
  78. close(bom.updateSubscriptions)
  79. if om.boms[bom.broker] == bom {
  80. delete(om.boms, bom.broker)
  81. }
  82. }
  83. }
  84. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  85. om.lock.Lock()
  86. defer om.lock.Unlock()
  87. delete(om.boms, bom.broker)
  88. }
  89. func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
  90. om.lock.Lock()
  91. defer om.lock.Unlock()
  92. delete(om.poms[pom.topic], pom.partition)
  93. if len(om.poms[pom.topic]) == 0 {
  94. delete(om.poms, pom.topic)
  95. }
  96. }
  97. // Partition Offset Manager
  98. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  99. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  100. // out of scope.
  101. type PartitionOffsetManager interface {
  102. // NextOffset returns the next offset that should be consumed for the managed partition, accompanied
  103. // by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
  104. // NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
  105. // offset was committed for this partition yet.
  106. NextOffset() (int64, string)
  107. // MarkOffset marks the provided offset as processed, alongside a metadata string that represents
  108. // the state of the partition consumer at that point in time. The metadata string can be used by
  109. // another consumer to restore that state, so it can resume consumption.
  110. //
  111. // Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
  112. // for efficiency reasons, and it may never be committed if your application crashes. This means that
  113. // you may end up processing the same message twice, and your processing should ideally be idempotent.
  114. MarkOffset(offset int64, metadata string)
  115. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  116. // errors are logged and not returned over this channel. If you want to implement any custom error
  117. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  118. Errors() <-chan *ConsumerError
  119. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  120. // after which you should wait until the 'errors' channel has been drained and closed.
  121. // It is required to call this function, or Close before a consumer object passes out of scope,
  122. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  123. // client.
  124. AsyncClose()
  125. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  126. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  127. // leak memory. You must call this before calling Close on the underlying client.
  128. Close() error
  129. }
  130. type partitionOffsetManager struct {
  131. parent *offsetManager
  132. topic string
  133. partition int32
  134. lock sync.Mutex
  135. offset int64
  136. metadata string
  137. dirty bool
  138. clean chan none
  139. broker *brokerOffsetManager
  140. errors chan *ConsumerError
  141. rebalance chan none
  142. dying chan none
  143. }
  144. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  145. pom := &partitionOffsetManager{
  146. parent: om,
  147. topic: topic,
  148. partition: partition,
  149. clean: make(chan none),
  150. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  151. rebalance: make(chan none, 1),
  152. dying: make(chan none),
  153. }
  154. if err := pom.selectBroker(); err != nil {
  155. return nil, err
  156. }
  157. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  158. return nil, err
  159. }
  160. pom.broker.updateSubscriptions <- pom
  161. go withRecover(pom.mainLoop)
  162. return pom, nil
  163. }
  164. func (pom *partitionOffsetManager) mainLoop() {
  165. for {
  166. select {
  167. case <-pom.rebalance:
  168. if err := pom.selectBroker(); err != nil {
  169. pom.handleError(err)
  170. pom.rebalance <- none{}
  171. } else {
  172. pom.broker.updateSubscriptions <- pom
  173. }
  174. case <-pom.dying:
  175. if pom.broker != nil {
  176. select {
  177. case <-pom.rebalance:
  178. case pom.broker.updateSubscriptions <- pom:
  179. }
  180. pom.parent.unrefBrokerOffsetManager(pom.broker)
  181. }
  182. pom.parent.abandonPartitionOffsetManager(pom)
  183. close(pom.errors)
  184. return
  185. }
  186. }
  187. }
  188. func (pom *partitionOffsetManager) selectBroker() error {
  189. if pom.broker != nil {
  190. pom.parent.unrefBrokerOffsetManager(pom.broker)
  191. pom.broker = nil
  192. }
  193. var broker *Broker
  194. var err error
  195. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  196. return err
  197. }
  198. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  199. return err
  200. }
  201. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  202. return nil
  203. }
  204. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  205. request := new(OffsetFetchRequest)
  206. request.Version = 1
  207. request.ConsumerGroup = pom.parent.group
  208. request.AddPartition(pom.topic, pom.partition)
  209. response, err := pom.broker.broker.FetchOffset(request)
  210. if err != nil {
  211. return err
  212. }
  213. block := response.GetBlock(pom.topic, pom.partition)
  214. if block == nil {
  215. return ErrIncompleteResponse
  216. }
  217. switch block.Err {
  218. case ErrNoError:
  219. pom.offset = block.Offset
  220. pom.metadata = block.Metadata
  221. return nil
  222. case ErrNotCoordinatorForConsumer:
  223. if retries <= 0 {
  224. return block.Err
  225. }
  226. if err := pom.selectBroker(); err != nil {
  227. return err
  228. }
  229. return pom.fetchInitialOffset(retries - 1)
  230. case ErrOffsetsLoadInProgress:
  231. if retries <= 0 {
  232. return block.Err
  233. }
  234. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  235. return pom.fetchInitialOffset(retries - 1)
  236. default:
  237. return block.Err
  238. }
  239. }
  240. func (pom *partitionOffsetManager) handleError(err error) {
  241. cErr := &ConsumerError{
  242. Topic: pom.topic,
  243. Partition: pom.partition,
  244. Err: err,
  245. }
  246. if pom.parent.conf.Consumer.Return.Errors {
  247. pom.errors <- cErr
  248. } else {
  249. Logger.Println(cErr)
  250. }
  251. }
  252. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  253. return pom.errors
  254. }
  255. func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
  256. pom.lock.Lock()
  257. defer pom.lock.Unlock()
  258. pom.offset = offset
  259. pom.metadata = metadata
  260. pom.dirty = true
  261. }
  262. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  263. pom.lock.Lock()
  264. defer pom.lock.Unlock()
  265. if pom.offset == offset && pom.metadata == metadata {
  266. pom.dirty = false
  267. select {
  268. case pom.clean <- none{}:
  269. default:
  270. }
  271. }
  272. }
  273. func (pom *partitionOffsetManager) NextOffset() (int64, string) {
  274. pom.lock.Lock()
  275. defer pom.lock.Unlock()
  276. if pom.offset >= 0 {
  277. return pom.offset + 1, pom.metadata
  278. } else {
  279. return pom.parent.conf.Consumer.Offsets.Initial, ""
  280. }
  281. }
  282. func (pom *partitionOffsetManager) AsyncClose() {
  283. go func() {
  284. pom.lock.Lock()
  285. dirty := pom.dirty
  286. pom.lock.Unlock()
  287. if dirty {
  288. <-pom.clean
  289. }
  290. close(pom.dying)
  291. }()
  292. }
  293. func (pom *partitionOffsetManager) Close() error {
  294. pom.AsyncClose()
  295. var errors ConsumerErrors
  296. for err := range pom.errors {
  297. errors = append(errors, err)
  298. }
  299. if len(errors) > 0 {
  300. return errors
  301. }
  302. return nil
  303. }
  304. // Broker Offset Manager
  305. type brokerOffsetManager struct {
  306. parent *offsetManager
  307. broker *Broker
  308. timer *time.Ticker
  309. updateSubscriptions chan *partitionOffsetManager
  310. subscriptions map[*partitionOffsetManager]none
  311. refs int
  312. }
  313. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  314. bom := &brokerOffsetManager{
  315. parent: om,
  316. broker: broker,
  317. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  318. updateSubscriptions: make(chan *partitionOffsetManager),
  319. subscriptions: make(map[*partitionOffsetManager]none),
  320. }
  321. go withRecover(bom.mainLoop)
  322. return bom
  323. }
  324. func (bom *brokerOffsetManager) mainLoop() {
  325. for {
  326. select {
  327. case <-bom.timer.C:
  328. if len(bom.subscriptions) > 0 {
  329. bom.flushToBroker()
  330. }
  331. case s, ok := <-bom.updateSubscriptions:
  332. if !ok {
  333. bom.timer.Stop()
  334. return
  335. }
  336. if _, ok := bom.subscriptions[s]; ok {
  337. delete(bom.subscriptions, s)
  338. } else {
  339. bom.subscriptions[s] = none{}
  340. }
  341. }
  342. }
  343. }
  344. func (bom *brokerOffsetManager) flushToBroker() {
  345. request := bom.constructRequest()
  346. if request == nil {
  347. return
  348. }
  349. response, err := bom.broker.CommitOffset(request)
  350. if err != nil {
  351. bom.abort(err)
  352. return
  353. }
  354. for s := range bom.subscriptions {
  355. if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
  356. continue
  357. }
  358. var err KError
  359. var ok bool
  360. if response.Errors[s.topic] == nil {
  361. s.handleError(ErrIncompleteResponse)
  362. delete(bom.subscriptions, s)
  363. s.rebalance <- none{}
  364. continue
  365. }
  366. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  367. s.handleError(ErrIncompleteResponse)
  368. delete(bom.subscriptions, s)
  369. s.rebalance <- none{}
  370. continue
  371. }
  372. switch err {
  373. case ErrNoError:
  374. block := request.blocks[s.topic][s.partition]
  375. s.updateCommitted(block.offset, block.metadata)
  376. break
  377. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  378. delete(bom.subscriptions, s)
  379. s.rebalance <- none{}
  380. default:
  381. s.handleError(err)
  382. delete(bom.subscriptions, s)
  383. s.rebalance <- none{}
  384. }
  385. }
  386. }
  387. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  388. r := &OffsetCommitRequest{
  389. Version: 1,
  390. ConsumerGroup: bom.parent.group,
  391. }
  392. for s := range bom.subscriptions {
  393. s.lock.Lock()
  394. if s.dirty {
  395. r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
  396. }
  397. s.lock.Unlock()
  398. }
  399. if len(r.blocks) > 0 {
  400. return r
  401. }
  402. return nil
  403. }
  404. func (bom *brokerOffsetManager) abort(err error) {
  405. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  406. bom.parent.abandonBroker(bom)
  407. for pom := range bom.subscriptions {
  408. pom.handleError(err)
  409. pom.rebalance <- none{}
  410. }
  411. for s := range bom.updateSubscriptions {
  412. if _, ok := bom.subscriptions[s]; !ok {
  413. s.handleError(err)
  414. s.rebalance <- none{}
  415. }
  416. }
  417. bom.subscriptions = make(map[*partitionOffsetManager]none)
  418. }