offset_manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
  10. // return an error if this OffsetManager is already managing the given topic/partition.
  11. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  12. }
  13. type offsetManager struct {
  14. client Client
  15. conf *Config
  16. group string
  17. lock sync.Mutex
  18. poms map[string]map[int32]*partitionOffsetManager
  19. boms map[*Broker]*brokerOffsetManager
  20. }
  21. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  22. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  23. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  24. // Check that we are not dealing with a closed Client before processing any other arguments
  25. if client.Closed() {
  26. return nil, ErrClosedClient
  27. }
  28. om := &offsetManager{
  29. client: client,
  30. conf: client.Config(),
  31. group: group,
  32. poms: make(map[string]map[int32]*partitionOffsetManager),
  33. boms: make(map[*Broker]*brokerOffsetManager),
  34. }
  35. return om, nil
  36. }
  37. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  38. pom, err := om.newPartitionOffsetManager(topic, partition)
  39. if err != nil {
  40. return nil, err
  41. }
  42. om.lock.Lock()
  43. defer om.lock.Unlock()
  44. topicManagers := om.poms[topic]
  45. if topicManagers == nil {
  46. topicManagers = make(map[int32]*partitionOffsetManager)
  47. om.poms[topic] = topicManagers
  48. }
  49. if topicManagers[partition] != nil {
  50. return nil, ConfigurationError("That topic/partition is already being managed")
  51. }
  52. topicManagers[partition] = pom
  53. return pom, nil
  54. }
  55. func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  56. om.lock.Lock()
  57. defer om.lock.Unlock()
  58. bom := om.boms[broker]
  59. if bom == nil {
  60. bom = om.newBrokerOffsetManager(broker)
  61. om.boms[broker] = bom
  62. }
  63. bom.refs++
  64. return bom
  65. }
  66. func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
  67. om.lock.Lock()
  68. defer om.lock.Unlock()
  69. bom.refs--
  70. if bom.refs == 0 {
  71. close(bom.updateSubscriptions)
  72. if om.boms[bom.broker] == bom {
  73. delete(om.boms, bom.broker)
  74. }
  75. }
  76. }
  77. func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
  78. om.lock.Lock()
  79. defer om.lock.Unlock()
  80. delete(om.boms, bom.broker)
  81. }
  82. // Partition Offset Manager
  83. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  84. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  85. // out of scope.
  86. type PartitionOffsetManager interface {
  87. // Offset returns the current offset and metadata according to the manager; this value has not necessarily
  88. // been flushed to the cluster yet.
  89. Offset() (int64, string)
  90. // SetOffset sets the current offset and metadata according to the manager; this value (or a subsequent update)
  91. // will eventually be flushed to the cluster based on configuration.
  92. SetOffset(offset int64, metadata string)
  93. // Errors returns a read channel of errors that occur during offset management, if enabled. By default,
  94. // errors are logged and not returned over this channel. If you want to implement any custom error
  95. // handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
  96. Errors() <-chan *ConsumerError
  97. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
  98. // after which you should wait until the 'errors' channel has been drained and closed.
  99. // It is required to call this function, or Close before a consumer object passes out of scope,
  100. // as it will otherwise leak memory. You must call this before calling Close on the underlying
  101. // client.
  102. AsyncClose()
  103. // Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
  104. // (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
  105. // leak memory. You must call this before calling Close on the underlying client.
  106. Close() error
  107. }
  108. type partitionOffsetManager struct {
  109. parent *offsetManager
  110. topic string
  111. partition int32
  112. lock sync.Mutex
  113. offset int64
  114. metadata string
  115. broker *brokerOffsetManager
  116. errors chan *ConsumerError
  117. rebalance chan none
  118. dying chan none
  119. }
  120. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  121. pom := &partitionOffsetManager{
  122. parent: om,
  123. topic: topic,
  124. partition: partition,
  125. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  126. rebalance: make(chan none, 1),
  127. dying: make(chan none),
  128. }
  129. if err := pom.selectBroker(); err != nil {
  130. return nil, err
  131. }
  132. if err := pom.fetchInitialOffset(om.conf.Metadata.Retry.Max); err != nil {
  133. return nil, err
  134. }
  135. go withRecover(pom.mainLoop)
  136. return pom, nil
  137. }
  138. func (pom *partitionOffsetManager) mainLoop() {
  139. for {
  140. select {
  141. case <-pom.rebalance:
  142. if err := pom.selectBroker(); err != nil {
  143. pom.handleError(err)
  144. pom.rebalance <- none{}
  145. }
  146. case <-pom.dying:
  147. if pom.broker != nil {
  148. select {
  149. case <-pom.rebalance:
  150. case pom.broker.updateSubscriptions <- pom:
  151. }
  152. pom.parent.unrefBrokerOffsetManager(pom.broker)
  153. }
  154. close(pom.errors)
  155. return
  156. }
  157. }
  158. }
  159. func (pom *partitionOffsetManager) selectBroker() error {
  160. if pom.broker != nil {
  161. pom.parent.unrefBrokerOffsetManager(pom.broker)
  162. pom.broker = nil
  163. }
  164. var broker *Broker
  165. var err error
  166. if err = pom.parent.client.RefreshCoordinator(pom.parent.group); err != nil {
  167. return err
  168. }
  169. if broker, err = pom.parent.client.Coordinator(pom.parent.group); err != nil {
  170. return err
  171. }
  172. pom.broker = pom.parent.refBrokerOffsetManager(broker)
  173. pom.broker.updateSubscriptions <- pom
  174. return nil
  175. }
  176. func (pom *partitionOffsetManager) fetchInitialOffset(retries int) error {
  177. request := new(OffsetFetchRequest)
  178. request.Version = 1
  179. request.ConsumerGroup = pom.parent.group
  180. request.AddPartition(pom.topic, pom.partition)
  181. response, err := pom.broker.broker.FetchOffset(request)
  182. if err != nil {
  183. return err
  184. }
  185. block := response.GetBlock(pom.topic, pom.partition)
  186. if block == nil {
  187. return ErrIncompleteResponse
  188. }
  189. switch block.Err {
  190. case ErrNoError:
  191. pom.offset = block.Offset
  192. pom.metadata = block.Metadata
  193. return nil
  194. case ErrNotCoordinatorForConsumer:
  195. if retries <= 0 {
  196. return err
  197. }
  198. if err := pom.selectBroker(); err != nil {
  199. return err
  200. }
  201. return pom.fetchInitialOffset(retries - 1)
  202. case ErrOffsetsLoadInProgress:
  203. if retries <= 0 {
  204. return err
  205. }
  206. time.Sleep(pom.parent.conf.Metadata.Retry.Backoff)
  207. return pom.fetchInitialOffset(retries - 1)
  208. default:
  209. return err
  210. }
  211. }
  212. func (pom *partitionOffsetManager) handleError(err error) {
  213. cErr := &ConsumerError{
  214. Topic: pom.topic,
  215. Partition: pom.partition,
  216. Err: err,
  217. }
  218. if pom.parent.conf.Consumer.Return.Errors {
  219. pom.errors <- cErr
  220. } else {
  221. Logger.Println(cErr)
  222. }
  223. }
  224. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  225. return pom.errors
  226. }
  227. func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
  228. pom.lock.Lock()
  229. defer pom.lock.Unlock()
  230. pom.offset = offset
  231. pom.metadata = metadata
  232. }
  233. func (pom *partitionOffsetManager) Offset() (int64, string) {
  234. pom.lock.Lock()
  235. defer pom.lock.Unlock()
  236. return pom.offset, pom.metadata
  237. }
  238. func (pom *partitionOffsetManager) AsyncClose() {
  239. close(pom.dying)
  240. }
  241. func (pom *partitionOffsetManager) Close() error {
  242. pom.AsyncClose()
  243. var errors ConsumerErrors
  244. for err := range pom.errors {
  245. errors = append(errors, err)
  246. }
  247. if len(errors) > 0 {
  248. return errors
  249. }
  250. return nil
  251. }
  252. // Broker Offset Manager
  253. type brokerOffsetManager struct {
  254. parent *offsetManager
  255. broker *Broker
  256. timer *time.Ticker
  257. updateSubscriptions chan *partitionOffsetManager
  258. subscriptions map[*partitionOffsetManager]none
  259. refs int
  260. }
  261. func (om *offsetManager) newBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
  262. bom := &brokerOffsetManager{
  263. parent: om,
  264. broker: broker,
  265. timer: time.NewTicker(om.conf.Consumer.Offsets.CommitInterval),
  266. updateSubscriptions: make(chan *partitionOffsetManager),
  267. subscriptions: make(map[*partitionOffsetManager]none),
  268. }
  269. go withRecover(bom.mainLoop)
  270. return bom
  271. }
  272. func (bom *brokerOffsetManager) mainLoop() {
  273. for {
  274. select {
  275. case <-bom.timer.C:
  276. bom.flushToBroker()
  277. case s, ok := <-bom.updateSubscriptions:
  278. if !ok {
  279. bom.timer.Stop()
  280. return
  281. }
  282. if _, ok := bom.subscriptions[s]; ok {
  283. delete(bom.subscriptions, s)
  284. } else {
  285. bom.subscriptions[s] = none{}
  286. }
  287. }
  288. }
  289. }
  290. func (bom *brokerOffsetManager) flushToBroker() {
  291. request := bom.constructRequest()
  292. response, err := bom.broker.CommitOffset(request)
  293. if err != nil {
  294. bom.abort(err)
  295. }
  296. for s := range bom.subscriptions {
  297. var err KError
  298. var ok bool
  299. if response.Errors[s.topic] == nil {
  300. s.handleError(ErrIncompleteResponse)
  301. delete(bom.subscriptions, s)
  302. s.rebalance <- none{}
  303. continue
  304. }
  305. if err, ok = response.Errors[s.topic][s.partition]; !ok {
  306. s.handleError(ErrIncompleteResponse)
  307. delete(bom.subscriptions, s)
  308. s.rebalance <- none{}
  309. continue
  310. }
  311. switch err {
  312. case ErrNoError:
  313. break
  314. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
  315. delete(bom.subscriptions, s)
  316. s.rebalance <- none{}
  317. default:
  318. s.handleError(err)
  319. delete(bom.subscriptions, s)
  320. s.rebalance <- none{}
  321. }
  322. }
  323. }
  324. func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
  325. r := &OffsetCommitRequest{
  326. Version: 1,
  327. ConsumerGroup: bom.parent.group,
  328. }
  329. for s := range bom.subscriptions {
  330. s.lock.Lock()
  331. r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
  332. s.lock.Unlock()
  333. }
  334. return r
  335. }
  336. func (bom *brokerOffsetManager) abort(err error) {
  337. _ = bom.broker.Close() // we don't care about the error this might return, we already have one
  338. bom.parent.abandonBroker(bom)
  339. for pom := range bom.subscriptions {
  340. pom.handleError(err)
  341. pom.rebalance <- none{}
  342. }
  343. for s := range bom.updateSubscriptions {
  344. if _, ok := bom.subscriptions[s]; !ok {
  345. s.handleError(err)
  346. s.rebalance <- none{}
  347. }
  348. }
  349. }