offset_manager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. }
  13. type offsetManager struct {
  14. client Client
  15. conf *Config
  16. group string
  17. lock sync.Mutex
  18. poms map[string]map[int32]*partitionOffsetManager
  19. boms map[*Broker]*brokerOffsetManager
  20. }
  21. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  22. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  23. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  24. // Check that we are not dealing with a closed Client before processing any other arguments
  25. if client.Closed() {
  26. return nil, ErrClosedClient
  27. }
  28. om := &offsetManager{
  29. client: client,
  30. conf: client.Config(),
  31. group: group,
  32. poms: make(map[string]map[int32]*partitionOffsetManager),
  33. boms: make(map[*Broker]*brokerOffsetManager),
  34. }
  35. return om, nil
  36. }
  37. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  38. pom, err := om.newPartitionOffsetManager(topic, partition)
  39. if err != nil {
  40. return nil, err
  41. }
  42. om.lock.Lock()
  43. defer om.lock.Unlock()
  44. topicManagers := om.poms[topic]
  45. if topicManagers == nil {
  46. topicManagers = make(map[int32]*partitionOffsetManager)
  47. om.poms[topic] = topicManagers
  48. }
  49. if topicManagers[partition] != nil {
  50. return nil, ConfigurationError("That topic/partition is already being managed")
  51. }
  52. topicManagers[partition] = pom
  53. return pom, nil
  54. }
  55. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  56. om.lock.Lock()
  57. defer om.lock.Unlock()
  58. bom := om.boms[broker]
  59. if bom == nil {
  60. bom = om.newBrokerOffsetManager(broker)
  61. om.boms[broker] = bom
  62. }
  63. bom.refs++
  64. return bom
  65. }
  66. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  67. om.lock.Lock()
  68. defer om.lock.Unlock()
  69. bom.refs--
  70. if bom.refs == 0 {
  71. close(bom.updateSubscriptions)
  72. if om.boms[bom.broker] == bom {
  73. delete(om.boms, bom.broker)
  74. }
  75. }
  76. }
  77. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  78. om.lock.Lock()
  79. defer om.lock.Unlock()
  80. delete(om.boms, bom.broker)
  81. }
  82. // Partition Offset Manager
  83. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  84. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  85. // out of scope.
  86. type PartitionOffsetManager interface {
  87. // Offset returns the current offset and metadata according to the manager; this value has not necessarily
  88. // been flushed to the cluster yet.
  89. Offset() (int64, string)
  90. // SetOffset sets the current offset and metadata according to the manager; this value (or a subsequent update)
  91. // will eventually be flushed to the cluster based on configuration.
  92. SetOffset(offset int64, metadata string)
  93. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  94. // errors are logged and not returned over this channel. If you want to implement any custom error
  95. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  96. Errors() <-chan *ConsumerError
  97. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  98. // after which you should wait until the 'errors' channel has been drained and closed.
  99. // It is required to call this function, or Close before a consumer object passes out of scope,
  100. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  101. // client.
  102. AsyncClose()
  103. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  104. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  105. // leak memory. You must call this before calling Close on the underlying client.
  106. Close() error
  107. }
  108. type partitionOffsetManager struct {
  109. parent *offsetManager
  110. topic string
  111. partition int32
  112. lock sync.Mutex
  113. offset int64
  114. metadata string
  115. dirty bool
  116. clean chan none
  117. broker *brokerOffsetManager
  118. errors chan *ConsumerError
  119. rebalance chan none
  120. dying chan none
  121. }
  122. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  123. pom := &partitionOffsetManager{
  124. parent: om,
  125. topic: topic,
  126. partition: partition,
  127. clean: make(chan none),
  128. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  129. rebalance: make(chan none, 1),
  130. dying: make(chan none),
  131. }
  132. if err := pom.selectBroker(); err != nil {
  133. return nil, err
  134. }
  135. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  136. return nil, err
  137. }
  138. pom.broker.updateSubscriptions <- pom
  139. go withRecover(pom.mainLoop)
  140. return pom, nil
  141. }
  142. func (pom *partitionOffsetManager) mainLoop() {
  143. for {
  144. select {
  145. case <-pom.rebalance:
  146. if err := pom.selectBroker(); err != nil {
  147. pom.handleError(err)
  148. pom.rebalance <- none{}
  149. } else {
  150. pom.broker.updateSubscriptions <- pom
  151. }
  152. case <-pom.dying:
  153. if pom.broker != nil {
  154. select {
  155. case <-pom.rebalance:
  156. case pom.broker.updateSubscriptions <- pom:
  157. }
  158. pom.parent.unrefBrokerOffsetManager(pom.broker)
  159. }
  160. close(pom.errors)
  161. return
  162. }
  163. }
  164. }
  165. func (pom *partitionOffsetManager) selectBroker() error {
  166. if pom.broker != nil {
  167. pom.parent.unrefBrokerOffsetManager(pom.broker)
  168. pom.broker = nil
  169. }
  170. var broker *Broker
  171. var err error
  172. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  173. return err
  174. }
  175. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  176. return err
  177. }
  178. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  179. return nil
  180. }
  181. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  182. request := new(OffsetFetchRequest)
  183. request.Version = 1
  184. request.ConsumerGroup = pom.parent.group
  185. request.AddPartition(pom.topic, pom.partition)
  186. response, err := pom.broker.broker.FetchOffset(request)
  187. if err != nil {
  188. return err
  189. }
  190. block := response.GetBlock(pom.topic, pom.partition)
  191. if block == nil {
  192. return ErrIncompleteResponse
  193. }
  194. switch block.Err {
  195. case ErrNoError:
  196. pom.offset = block.Offset
  197. pom.metadata = block.Metadata
  198. return nil
  199. case ErrNotCoordinatorForConsumer:
  200. if retries <= 0 {
  201. return err
  202. }
  203. if err := pom.selectBroker(); err != nil {
  204. return err
  205. }
  206. return pom.fetchInitialOffset(retries - 1)
  207. case ErrOffsetsLoadInProgress:
  208. if retries <= 0 {
  209. return err
  210. }
  211. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  212. return pom.fetchInitialOffset(retries - 1)
  213. default:
  214. return err
  215. }
  216. }
  217. func (pom *partitionOffsetManager) handleError(err error) {
  218. cErr := &ConsumerError{
  219. Topic: pom.topic,
  220. Partition: pom.partition,
  221. Err: err,
  222. }
  223. if pom.parent.conf.Consumer.Return.Errors {
  224. pom.errors <- cErr
  225. } else {
  226. Logger.Println(cErr)
  227. }
  228. }
  229. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  230. return pom.errors
  231. }
  232. func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
  233. pom.lock.Lock()
  234. defer pom.lock.Unlock()
  235. pom.offset = offset
  236. pom.metadata = metadata
  237. pom.dirty = true
  238. }
  239. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  240. pom.lock.Lock()
  241. defer pom.lock.Unlock()
  242. if pom.offset == offset && pom.metadata == metadata {
  243. pom.dirty = false
  244. select {
  245. case pom.clean <- none{}:
  246. default:
  247. }
  248. }
  249. }
  250. func (pom *partitionOffsetManager) Offset() (int64, string) {
  251. pom.lock.Lock()
  252. defer pom.lock.Unlock()
  253. return pom.offset, pom.metadata
  254. }
  255. func (pom *partitionOffsetManager) AsyncClose() {
  256. go func() {
  257. pom.lock.Lock()
  258. dirty := pom.dirty
  259. pom.lock.Unlock()
  260. if dirty {
  261. <-pom.clean
  262. }
  263. close(pom.dying)
  264. }()
  265. }
  266. func (pom *partitionOffsetManager) Close() error {
  267. pom.AsyncClose()
  268. var errors ConsumerErrors
  269. for err := range pom.errors {
  270. errors = append(errors, err)
  271. }
  272. if len(errors) > 0 {
  273. return errors
  274. }
  275. return nil
  276. }
  277. // Broker Offset Manager
  278. type brokerOffsetManager struct {
  279. parent *offsetManager
  280. broker *Broker
  281. timer *time.Ticker
  282. updateSubscriptions chan *partitionOffsetManager
  283. subscriptions map[*partitionOffsetManager]none
  284. refs int
  285. }
  286. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  287. bom := &brokerOffsetManager{
  288. parent: om,
  289. broker: broker,
  290. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  291. updateSubscriptions: make(chan *partitionOffsetManager),
  292. subscriptions: make(map[*partitionOffsetManager]none),
  293. }
  294. go withRecover(bom.mainLoop)
  295. return bom
  296. }
  297. func (bom *brokerOffsetManager) mainLoop() {
  298. for {
  299. select {
  300. case <-bom.timer.C:
  301. if len(bom.subscriptions) > 0 {
  302. bom.flushToBroker()
  303. }
  304. case s, ok := <-bom.updateSubscriptions:
  305. if !ok {
  306. bom.timer.Stop()
  307. return
  308. }
  309. if _, ok := bom.subscriptions[s]; ok {
  310. delete(bom.subscriptions, s)
  311. } else {
  312. bom.subscriptions[s] = none{}
  313. }
  314. }
  315. }
  316. }
  317. func (bom *brokerOffsetManager) flushToBroker() {
  318. request := bom.constructRequest()
  319. response, err := bom.broker.CommitOffset(request)
  320. if err != nil {
  321. bom.abort(err)
  322. }
  323. for s := range bom.subscriptions {
  324. var err KError
  325. var ok bool
  326. if response.Errors[s.topic] == nil {
  327. s.handleError(ErrIncompleteResponse)
  328. delete(bom.subscriptions, s)
  329. s.rebalance <- none{}
  330. continue
  331. }
  332. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  333. s.handleError(ErrIncompleteResponse)
  334. delete(bom.subscriptions, s)
  335. s.rebalance <- none{}
  336. continue
  337. }
  338. switch err {
  339. case ErrNoError:
  340. block := request.blocks[s.topic][s.partition]
  341. s.updateCommitted(block.offset, block.metadata)
  342. break
  343. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  344. delete(bom.subscriptions, s)
  345. s.rebalance <- none{}
  346. default:
  347. s.handleError(err)
  348. delete(bom.subscriptions, s)
  349. s.rebalance <- none{}
  350. }
  351. }
  352. }
  353. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  354. r := &OffsetCommitRequest{
  355. Version: 1,
  356. ConsumerGroup: bom.parent.group,
  357. }
  358. for s := range bom.subscriptions {
  359. s.lock.Lock()
  360. r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
  361. s.lock.Unlock()
  362. }
  363. return r
  364. }
  365. func (bom *brokerOffsetManager) abort(err error) {
  366. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  367. bom.parent.abandonBroker(bom)
  368. for pom := range bom.subscriptions {
  369. pom.handleError(err)
  370. pom.rebalance <- none{}
  371. }
  372. for s := range bom.updateSubscriptions {
  373. if _, ok := bom.subscriptions[s]; !ok {
  374. s.handleError(err)
  375. s.rebalance <- none{}
  376. }
  377. }
  378. }