offset_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. }
  13. type offsetManager struct {
  14. client Client
  15. conf *Config
  16. group string
  17. lock sync.Mutex
  18. poms map[string]map[int32]*partitionOffsetManager
  19. boms map[*Broker]*brokerOffsetManager
  20. }
  21. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  22. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  23. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  24. // Check that we are not dealing with a closed Client before processing any other arguments
  25. if client.Closed() {
  26. return nil, ErrClosedClient
  27. }
  28. om := &offsetManager{
  29. client: client,
  30. conf: client.Config(),
  31. group: group,
  32. poms: make(map[string]map[int32]*partitionOffsetManager),
  33. boms: make(map[*Broker]*brokerOffsetManager),
  34. }
  35. return om, nil
  36. }
  37. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  38. pom, err := om.newPartitionOffsetManager(topic, partition)
  39. if err != nil {
  40. return nil, err
  41. }
  42. om.lock.Lock()
  43. defer om.lock.Unlock()
  44. topicManagers := om.poms[topic]
  45. if topicManagers == nil {
  46. topicManagers = make(map[int32]*partitionOffsetManager)
  47. om.poms[topic] = topicManagers
  48. }
  49. if topicManagers[partition] != nil {
  50. return nil, ConfigurationError("That topic/partition is already being managed")
  51. }
  52. topicManagers[partition] = pom
  53. return pom, nil
  54. }
  55. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  56. om.lock.Lock()
  57. defer om.lock.Unlock()
  58. bom := om.boms[broker]
  59. if bom == nil {
  60. bom = om.newBrokerOffsetManager(broker)
  61. om.boms[broker] = bom
  62. }
  63. bom.refs++
  64. return bom
  65. }
  66. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  67. om.lock.Lock()
  68. defer om.lock.Unlock()
  69. bom.refs--
  70. if bom.refs == 0 {
  71. close(bom.updateSubscriptions)
  72. if om.boms[bom.broker] == bom {
  73. delete(om.boms, bom.broker)
  74. }
  75. }
  76. }
  77. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  78. om.lock.Lock()
  79. defer om.lock.Unlock()
  80. delete(om.boms, bom.broker)
  81. }
  82. // Partition Offset Manager
  83. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  84. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  85. // out of scope.
  86. type PartitionOffsetManager interface {
  87. // Offset returns the current offset and metadata according to the manager; this value has not necessarily
  88. // been flushed to the cluster yet.
  89. Offset() (int64, string)
  90. // SetOffset sets the current offset and metadata according to the manager; this value (or a subsequent update)
  91. // will eventually be flushed to the cluster based on configuration.
  92. SetOffset(offset int64, metadata string)
  93. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  94. // errors are logged and not returned over this channel. If you want to implement any custom error
  95. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  96. Errors() <-chan *ConsumerError
  97. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  98. // after which you should wait until the 'errors' channel has been drained and closed.
  99. // It is required to call this function, or Close before a consumer object passes out of scope,
  100. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  101. // client.
  102. AsyncClose()
  103. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  104. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  105. // leak memory. You must call this before calling Close on the underlying client.
  106. Close() error
  107. }
  108. type partitionOffsetManager struct {
  109. parent *offsetManager
  110. topic string
  111. partition int32
  112. lock sync.Mutex
  113. offset int64
  114. metadata string
  115. dirty bool
  116. clean chan none
  117. broker *brokerOffsetManager
  118. errors chan *ConsumerError
  119. rebalance chan none
  120. dying chan none
  121. }
  122. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  123. pom := &partitionOffsetManager{
  124. parent: om,
  125. topic: topic,
  126. partition: partition,
  127. clean: make(chan none),
  128. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  129. rebalance: make(chan none, 1),
  130. dying: make(chan none),
  131. }
  132. if err := pom.selectBroker(); err != nil {
  133. return nil, err
  134. }
  135. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  136. return nil, err
  137. }
  138. go withRecover(pom.mainLoop)
  139. return pom, nil
  140. }
  141. func (pom *partitionOffsetManager) mainLoop() {
  142. for {
  143. select {
  144. case <-pom.rebalance:
  145. if err := pom.selectBroker(); err != nil {
  146. pom.handleError(err)
  147. pom.rebalance <- none{}
  148. }
  149. case <-pom.dying:
  150. if pom.broker != nil {
  151. select {
  152. case <-pom.rebalance:
  153. case pom.broker.updateSubscriptions <- pom:
  154. }
  155. pom.parent.unrefBrokerOffsetManager(pom.broker)
  156. }
  157. close(pom.errors)
  158. return
  159. }
  160. }
  161. }
  162. func (pom *partitionOffsetManager) selectBroker() error {
  163. if pom.broker != nil {
  164. pom.parent.unrefBrokerOffsetManager(pom.broker)
  165. pom.broker = nil
  166. }
  167. var broker *Broker
  168. var err error
  169. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  170. return err
  171. }
  172. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  173. return err
  174. }
  175. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  176. pom.broker.updateSubscriptions <- pom
  177. return nil
  178. }
  179. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  180. request := new(OffsetFetchRequest)
  181. request.Version = 1
  182. request.ConsumerGroup = pom.parent.group
  183. request.AddPartition(pom.topic, pom.partition)
  184. response, err := pom.broker.broker.FetchOffset(request)
  185. if err != nil {
  186. return err
  187. }
  188. block := response.GetBlock(pom.topic, pom.partition)
  189. if block == nil {
  190. return ErrIncompleteResponse
  191. }
  192. switch block.Err {
  193. case ErrNoError:
  194. pom.offset = block.Offset
  195. pom.metadata = block.Metadata
  196. return nil
  197. case ErrNotCoordinatorForConsumer:
  198. if retries <= 0 {
  199. return err
  200. }
  201. if err := pom.selectBroker(); err != nil {
  202. return err
  203. }
  204. return pom.fetchInitialOffset(retries - 1)
  205. case ErrOffsetsLoadInProgress:
  206. if retries <= 0 {
  207. return err
  208. }
  209. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  210. return pom.fetchInitialOffset(retries - 1)
  211. default:
  212. return err
  213. }
  214. }
  215. func (pom *partitionOffsetManager) handleError(err error) {
  216. cErr := &ConsumerError{
  217. Topic: pom.topic,
  218. Partition: pom.partition,
  219. Err: err,
  220. }
  221. if pom.parent.conf.Consumer.Return.Errors {
  222. pom.errors <- cErr
  223. } else {
  224. Logger.Println(cErr)
  225. }
  226. }
  227. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  228. return pom.errors
  229. }
  230. func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
  231. pom.lock.Lock()
  232. defer pom.lock.Unlock()
  233. pom.offset = offset
  234. pom.metadata = metadata
  235. pom.dirty = true
  236. }
  237. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  238. pom.lock.Lock()
  239. defer pom.lock.Unlock()
  240. if pom.offset == offset && pom.metadata == metadata {
  241. pom.dirty = false
  242. select {
  243. case pom.clean <- none{}:
  244. default:
  245. }
  246. }
  247. }
  248. func (pom *partitionOffsetManager) Offset() (int64, string) {
  249. pom.lock.Lock()
  250. defer pom.lock.Unlock()
  251. return pom.offset, pom.metadata
  252. }
  253. func (pom *partitionOffsetManager) AsyncClose() {
  254. go func() {
  255. pom.lock.Lock()
  256. dirty := pom.dirty
  257. pom.lock.Unlock()
  258. if dirty {
  259. <-pom.clean
  260. }
  261. close(pom.dying)
  262. }()
  263. }
  264. func (pom *partitionOffsetManager) Close() error {
  265. pom.AsyncClose()
  266. var errors ConsumerErrors
  267. for err := range pom.errors {
  268. errors = append(errors, err)
  269. }
  270. if len(errors) > 0 {
  271. return errors
  272. }
  273. return nil
  274. }
  275. // Broker Offset Manager
  276. type brokerOffsetManager struct {
  277. parent *offsetManager
  278. broker *Broker
  279. timer *time.Ticker
  280. updateSubscriptions chan *partitionOffsetManager
  281. subscriptions map[*partitionOffsetManager]none
  282. refs int
  283. }
  284. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  285. bom := &brokerOffsetManager{
  286. parent: om,
  287. broker: broker,
  288. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  289. updateSubscriptions: make(chan *partitionOffsetManager),
  290. subscriptions: make(map[*partitionOffsetManager]none),
  291. }
  292. go withRecover(bom.mainLoop)
  293. return bom
  294. }
  295. func (bom *brokerOffsetManager) mainLoop() {
  296. for {
  297. select {
  298. case <-bom.timer.C:
  299. bom.flushToBroker()
  300. case s, ok := <-bom.updateSubscriptions:
  301. if !ok {
  302. bom.timer.Stop()
  303. return
  304. }
  305. if _, ok := bom.subscriptions[s]; ok {
  306. delete(bom.subscriptions, s)
  307. } else {
  308. bom.subscriptions[s] = none{}
  309. }
  310. }
  311. }
  312. }
  313. func (bom *brokerOffsetManager) flushToBroker() {
  314. request := bom.constructRequest()
  315. response, err := bom.broker.CommitOffset(request)
  316. if err != nil {
  317. bom.abort(err)
  318. }
  319. for s := range bom.subscriptions {
  320. var err KError
  321. var ok bool
  322. if response.Errors[s.topic] == nil {
  323. s.handleError(ErrIncompleteResponse)
  324. delete(bom.subscriptions, s)
  325. s.rebalance <- none{}
  326. continue
  327. }
  328. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  329. s.handleError(ErrIncompleteResponse)
  330. delete(bom.subscriptions, s)
  331. s.rebalance <- none{}
  332. continue
  333. }
  334. switch err {
  335. case ErrNoError:
  336. block := request.blocks[s.topic][s.partition]
  337. s.updateCommitted(block.offset, block.metadata)
  338. break
  339. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  340. delete(bom.subscriptions, s)
  341. s.rebalance <- none{}
  342. default:
  343. s.handleError(err)
  344. delete(bom.subscriptions, s)
  345. s.rebalance <- none{}
  346. }
  347. }
  348. }
  349. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  350. r := &OffsetCommitRequest{
  351. Version: 1,
  352. ConsumerGroup: bom.parent.group,
  353. }
  354. for s := range bom.subscriptions {
  355. s.lock.Lock()
  356. r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
  357. s.lock.Unlock()
  358. }
  359. return r
  360. }
  361. func (bom *brokerOffsetManager) abort(err error) {
  362. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  363. bom.parent.abandonBroker(bom)
  364. for pom := range bom.subscriptions {
  365. pom.handleError(err)
  366. pom.rebalance <- none{}
  367. }
  368. for s := range bom.updateSubscriptions {
  369. if _, ok := bom.subscriptions[s]; !ok {
  370. s.handleError(err)
  371. s.rebalance <- none{}
  372. }
  373. }
  374. }