offset_manager.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. }
  13. type offsetManager struct {
  14. client Client
  15. conf *Config
  16. group string
  17. lock sync.Mutex
  18. poms map[string]map[int32]*partitionOffsetManager
  19. boms map[*Broker]*brokerOffsetManager
  20. }
  21. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  22. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  23. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  24. // Check that we are not dealing with a closed Client before processing any other arguments
  25. if client.Closed() {
  26. return nil, ErrClosedClient
  27. }
  28. om := &offsetManager{
  29. client: client,
  30. conf: client.Config(),
  31. group: group,
  32. poms: make(map[string]map[int32]*partitionOffsetManager),
  33. boms: make(map[*Broker]*brokerOffsetManager),
  34. }
  35. return om, nil
  36. }
  37. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  38. pom, err := om.newPartitionOffsetManager(topic, partition)
  39. if err != nil {
  40. return nil, err
  41. }
  42. om.lock.Lock()
  43. defer om.lock.Unlock()
  44. topicManagers := om.poms[topic]
  45. if topicManagers == nil {
  46. topicManagers = make(map[int32]*partitionOffsetManager)
  47. om.poms[topic] = topicManagers
  48. }
  49. if topicManagers[partition] != nil {
  50. return nil, ConfigurationError("That topic/partition is already being managed")
  51. }
  52. topicManagers[partition] = pom
  53. return pom, nil
  54. }
  55. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  56. om.lock.Lock()
  57. defer om.lock.Unlock()
  58. bom := om.boms[broker]
  59. if bom == nil {
  60. bom = om.newBrokerOffsetManager(broker)
  61. om.boms[broker] = bom
  62. }
  63. bom.refs++
  64. return bom
  65. }
  66. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  67. om.lock.Lock()
  68. defer om.lock.Unlock()
  69. bom.refs--
  70. if bom.refs == 0 {
  71. close(bom.updateSubscriptions)
  72. if om.boms[bom.broker] == bom {
  73. delete(om.boms, bom.broker)
  74. }
  75. }
  76. }
  77. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  78. om.lock.Lock()
  79. defer om.lock.Unlock()
  80. delete(om.boms, bom.broker)
  81. }
  82. // Partition Offset Manager
  83. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  84. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  85. // out of scope.
  86. type PartitionOffsetManager interface {
  87. // Offset returns the last offset that was marked as processed and associated metadata according to the manager;
  88. // this value has not necessarily been flushed to the cluster yet. If you want to resume a partition consumer
  89. // from where it left off, remember that you have to increment the offset by one so the partition consumer will
  90. // start at the next message. This prevents the last committed message from being processed twice.
  91. Offset() (int64, string)
  92. // SetOffset sets the offset and metadata according to the manager; this value (or a subsequent update)
  93. // will eventually be flushed to the cluster based on configuration. You should only set the offset of
  94. // messages that have been completely processed.
  95. SetOffset(offset int64, metadata string)
  96. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  97. // errors are logged and not returned over this channel. If you want to implement any custom error
  98. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  99. Errors() <-chan *ConsumerError
  100. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  101. // after which you should wait until the 'errors' channel has been drained and closed.
  102. // It is required to call this function, or Close before a consumer object passes out of scope,
  103. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  104. // client.
  105. AsyncClose()
  106. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  107. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  108. // leak memory. You must call this before calling Close on the underlying client.
  109. Close() error
  110. }
  111. type partitionOffsetManager struct {
  112. parent *offsetManager
  113. topic string
  114. partition int32
  115. lock sync.Mutex
  116. offset int64
  117. metadata string
  118. dirty bool
  119. clean chan none
  120. broker *brokerOffsetManager
  121. errors chan *ConsumerError
  122. rebalance chan none
  123. dying chan none
  124. }
  125. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  126. pom := &partitionOffsetManager{
  127. parent: om,
  128. topic: topic,
  129. partition: partition,
  130. clean: make(chan none),
  131. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  132. rebalance: make(chan none, 1),
  133. dying: make(chan none),
  134. }
  135. if err := pom.selectBroker(); err != nil {
  136. return nil, err
  137. }
  138. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  139. return nil, err
  140. }
  141. pom.broker.updateSubscriptions <- pom
  142. go withRecover(pom.mainLoop)
  143. return pom, nil
  144. }
  145. func (pom *partitionOffsetManager) mainLoop() {
  146. for {
  147. select {
  148. case <-pom.rebalance:
  149. if err := pom.selectBroker(); err != nil {
  150. pom.handleError(err)
  151. pom.rebalance <- none{}
  152. } else {
  153. pom.broker.updateSubscriptions <- pom
  154. }
  155. case <-pom.dying:
  156. if pom.broker != nil {
  157. select {
  158. case <-pom.rebalance:
  159. case pom.broker.updateSubscriptions <- pom:
  160. }
  161. pom.parent.unrefBrokerOffsetManager(pom.broker)
  162. }
  163. close(pom.errors)
  164. return
  165. }
  166. }
  167. }
  168. func (pom *partitionOffsetManager) selectBroker() error {
  169. if pom.broker != nil {
  170. pom.parent.unrefBrokerOffsetManager(pom.broker)
  171. pom.broker = nil
  172. }
  173. var broker *Broker
  174. var err error
  175. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  176. return err
  177. }
  178. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  179. return err
  180. }
  181. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  182. return nil
  183. }
  184. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  185. request := new(OffsetFetchRequest)
  186. request.Version = 1
  187. request.ConsumerGroup = pom.parent.group
  188. request.AddPartition(pom.topic, pom.partition)
  189. response, err := pom.broker.broker.FetchOffset(request)
  190. if err != nil {
  191. return err
  192. }
  193. block := response.GetBlock(pom.topic, pom.partition)
  194. if block == nil {
  195. return ErrIncompleteResponse
  196. }
  197. switch block.Err {
  198. case ErrNoError:
  199. pom.offset = block.Offset
  200. pom.metadata = block.Metadata
  201. return nil
  202. case ErrNotCoordinatorForConsumer:
  203. if retries <= 0 {
  204. return block.Err
  205. }
  206. if err := pom.selectBroker(); err != nil {
  207. return err
  208. }
  209. return pom.fetchInitialOffset(retries - 1)
  210. case ErrOffsetsLoadInProgress:
  211. if retries <= 0 {
  212. return block.Err
  213. }
  214. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  215. return pom.fetchInitialOffset(retries - 1)
  216. default:
  217. return block.Err
  218. }
  219. }
  220. func (pom *partitionOffsetManager) handleError(err error) {
  221. cErr := &ConsumerError{
  222. Topic: pom.topic,
  223. Partition: pom.partition,
  224. Err: err,
  225. }
  226. if pom.parent.conf.Consumer.Return.Errors {
  227. pom.errors <- cErr
  228. } else {
  229. Logger.Println(cErr)
  230. }
  231. }
  232. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  233. return pom.errors
  234. }
  235. func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
  236. pom.lock.Lock()
  237. defer pom.lock.Unlock()
  238. pom.offset = offset
  239. pom.metadata = metadata
  240. pom.dirty = true
  241. }
  242. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  243. pom.lock.Lock()
  244. defer pom.lock.Unlock()
  245. if pom.offset == offset && pom.metadata == metadata {
  246. pom.dirty = false
  247. select {
  248. case pom.clean <- none{}:
  249. default:
  250. }
  251. }
  252. }
  253. func (pom *partitionOffsetManager) Offset() (int64, string) {
  254. pom.lock.Lock()
  255. defer pom.lock.Unlock()
  256. return pom.offset, pom.metadata
  257. }
  258. func (pom *partitionOffsetManager) AsyncClose() {
  259. go func() {
  260. pom.lock.Lock()
  261. dirty := pom.dirty
  262. pom.lock.Unlock()
  263. if dirty {
  264. <-pom.clean
  265. }
  266. close(pom.dying)
  267. }()
  268. }
  269. func (pom *partitionOffsetManager) Close() error {
  270. pom.AsyncClose()
  271. var errors ConsumerErrors
  272. for err := range pom.errors {
  273. errors = append(errors, err)
  274. }
  275. if len(errors) > 0 {
  276. return errors
  277. }
  278. return nil
  279. }
  280. // Broker Offset Manager
  281. type brokerOffsetManager struct {
  282. parent *offsetManager
  283. broker *Broker
  284. timer *time.Ticker
  285. updateSubscriptions chan *partitionOffsetManager
  286. subscriptions map[*partitionOffsetManager]none
  287. refs int
  288. }
  289. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  290. bom := &brokerOffsetManager{
  291. parent: om,
  292. broker: broker,
  293. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  294. updateSubscriptions: make(chan *partitionOffsetManager),
  295. subscriptions: make(map[*partitionOffsetManager]none),
  296. }
  297. go withRecover(bom.mainLoop)
  298. return bom
  299. }
  300. func (bom *brokerOffsetManager) mainLoop() {
  301. for {
  302. select {
  303. case <-bom.timer.C:
  304. if len(bom.subscriptions) > 0 {
  305. bom.flushToBroker()
  306. }
  307. case s, ok := <-bom.updateSubscriptions:
  308. if !ok {
  309. bom.timer.Stop()
  310. return
  311. }
  312. if _, ok := bom.subscriptions[s]; ok {
  313. delete(bom.subscriptions, s)
  314. } else {
  315. bom.subscriptions[s] = none{}
  316. }
  317. }
  318. }
  319. }
  320. func (bom *brokerOffsetManager) flushToBroker() {
  321. request := bom.constructRequest()
  322. if request == nil {
  323. return
  324. }
  325. response, err := bom.broker.CommitOffset(request)
  326. if err != nil {
  327. bom.abort(err)
  328. return
  329. }
  330. for s := range bom.subscriptions {
  331. if request.blocks[s.topic] == nil || request.blocks[s.topic][s.partition] == nil {
  332. continue
  333. }
  334. var err KError
  335. var ok bool
  336. if response.Errors[s.topic] == nil {
  337. s.handleError(ErrIncompleteResponse)
  338. delete(bom.subscriptions, s)
  339. s.rebalance <- none{}
  340. continue
  341. }
  342. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  343. s.handleError(ErrIncompleteResponse)
  344. delete(bom.subscriptions, s)
  345. s.rebalance <- none{}
  346. continue
  347. }
  348. switch err {
  349. case ErrNoError:
  350. block := request.blocks[s.topic][s.partition]
  351. s.updateCommitted(block.offset, block.metadata)
  352. break
  353. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  354. delete(bom.subscriptions, s)
  355. s.rebalance <- none{}
  356. default:
  357. s.handleError(err)
  358. delete(bom.subscriptions, s)
  359. s.rebalance <- none{}
  360. }
  361. }
  362. }
  363. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  364. r := &OffsetCommitRequest{
  365. Version: 1,
  366. ConsumerGroup: bom.parent.group,
  367. }
  368. for s := range bom.subscriptions {
  369. s.lock.Lock()
  370. if s.dirty {
  371. r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
  372. }
  373. s.lock.Unlock()
  374. }
  375. if len(r.blocks) > 0 {
  376. return r
  377. }
  378. return nil
  379. }
  380. func (bom *brokerOffsetManager) abort(err error) {
  381. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  382. bom.parent.abandonBroker(bom)
  383. for pom := range bom.subscriptions {
  384. pom.handleError(err)
  385. pom.rebalance <- none{}
  386. }
  387. for s := range bom.updateSubscriptions {
  388. if _, ok := bom.subscriptions[s]; !ok {
  389. s.handleError(err)
  390. s.rebalance <- none{}
  391. }
  392. }
  393. bom.subscriptions = make(map[*partitionOffsetManager]none)
  394. }